[jboss-cvs] JBoss Messaging SVN: r6067 - in trunk: tests/jms-tests/src/org/jboss/test/messaging/jms and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 11 16:01:08 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-11 16:01:08 -0400 (Wed, 11 Mar 2009)
New Revision: 6067

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
Log:
New tests and fixes

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-11 17:16:35 UTC (rev 6066)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-11 20:01:08 UTC (rev 6067)
@@ -49,6 +49,7 @@
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -59,7 +60,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
@@ -125,7 +125,7 @@
 
    private final boolean xa;
 
-   private ClientXAState state = null;
+   private final ClientXAState state = null;
 
    private final Executor executor;
 
@@ -173,9 +173,9 @@
    private volatile boolean started;
 
    private SendAcknowledgementHandler sendAckHandler;
-    
+
    private volatile boolean closedSent;
-   
+
    // Constructors ----------------------------------------------------------------------------
 
    public ClientSessionImpl(final ConnectionManager connectionManager,
@@ -317,7 +317,7 @@
 
       return response;
    }
- 
+
    public ClientConsumer createConsumer(final SimpleString queueName) throws MessagingException
    {
       return createConsumer(queueName, null, false);
@@ -528,12 +528,12 @@
       {
          consumer.clear();
       }
-      
-      //Acks must be flushed here *after connection is stopped and all onmessages finished executing
+
+      // Acks must be flushed here *after connection is stopped and all onmessages finished executing
       flushAcks();
 
       channel.sendBlocking(new RollbackMessage(isLastMessageAsDelived));
-                           
+
       if (wasStarted)
       {
          start();
@@ -564,16 +564,15 @@
 
       return new ClientMessageImpl(durable, body);
    }
-   
+
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.client.impl.ClientSessionInternal#createBuffer(int)
     */
-   public MessagingBuffer createBuffer(int size)
+   public MessagingBuffer createBuffer(final int size)
    {
-      return ChannelBuffers.dynamicBuffer(size); 
+      return ChannelBuffers.dynamicBuffer(size);
    }
 
-
    public ClientFileMessage createFileMessage(final boolean durable)
    {
       return new ClientFileMessageImpl(durable);
@@ -659,7 +658,7 @@
       {
          return;
       }
-      
+
       checkClosed();
 
       SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
@@ -710,9 +709,9 @@
       if (consumer != null)
       {
          ClientMessageInternal clMessage = message.getClientMessage();
-         
+
          clMessage.setFlowControlSize(clMessage.getEncodeSize());
-         
+
          consumer.handleMessage(message.getClientMessage());
       }
    }
@@ -737,27 +736,27 @@
          consumer.handleLargeMessageContinuation(continuation);
       }
    }
-   
+
    public void close() throws MessagingException
    {
       if (closed)
       {
          return;
       }
-   
+
       try
       {
          closeChildren();
-         
+
          closedSent = true;
-         
-         channel.sendBlocking(new SessionCloseMessage());           
+
+         channel.sendBlocking(new SessionCloseMessage());
       }
       catch (Throwable ignore)
       {
          // Session close should always return without exception
       }
-      
+
       doCleanup();
    }
 
@@ -767,7 +766,7 @@
       {
          return;
       }
-      
+
       cleanUpChildren();
 
       doCleanup();
@@ -775,9 +774,9 @@
 
    public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
    {
-      this.channel.setCommandConfirmationHandler(this);
+      channel.setCommandConfirmationHandler(this);
 
-      this.sendAckHandler = handler;
+      sendAckHandler = handler;
    }
 
    // Needs to be synchronized to prevent issues with occurring concurrently with close()
@@ -787,7 +786,7 @@
       {
          return true;
       }
-      
+
       boolean ok = false;
 
       // We lock the channel to prevent any packets to be added to the resend
@@ -815,9 +814,9 @@
             ok = true;
          }
          else
-         {                        
+         {
             if (closedSent)
-            {            
+            {
                // a session re-attach may fail, if the session close was sent before failover started, hit the server,
                // processed, then before the response was received back, failover occurred, re-attach was attempted. in
                // this case it's ok - we don't want to call any failure listeners and we don't want to halt the rest of
@@ -828,11 +827,11 @@
                ok = true;
             }
             else
-            {                              
+            {
                log.warn(System.identityHashCode(this) + " Session not found on server when attempting to re-attach");
             }
-            
-            channel.returnBlocking();            
+
+            channel.returnBlocking();
          }
       }
       catch (Throwable t)
@@ -848,7 +847,7 @@
 
       return ok;
    }
-   
+
    public void returnBlocking()
    {
       channel.returnBlocking();
@@ -987,7 +986,7 @@
 
       ClientSessionImpl other = (ClientSessionImpl)xares;
 
-      return remotingConnection == other.remotingConnection;
+      return connectionManager == other.connectionManager;
    }
 
    public int prepare(final Xid xid) throws XAException
@@ -1052,7 +1051,7 @@
    public void rollback(final Xid xid) throws XAException
    {
       checkXA();
-      
+
       try
       {
          boolean wasStarted = started;
@@ -1067,12 +1066,12 @@
          {
             consumer.clear();
          }
-         
+
          flushAcks();
 
          SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
 
-         SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet);
+         SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
 
          if (wasStarted)
          {
@@ -1361,7 +1360,7 @@
 
       int state;
 
-      public ClientXAState(Xid xid)
+      public ClientXAState(final Xid xid)
       {
          this.xid = xid;
       }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java	2009-03-11 17:16:35 UTC (rev 6066)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/XATest.java	2009-03-11 20:01:08 UTC (rev 6067)
@@ -2171,6 +2171,24 @@
    }
 
 
+   
+   public void testIsSamRM() throws Exception
+   {
+      XAConnection conn = null;
+      
+      conn = xacf.createXAConnection();
+
+      //Create a session
+      XASession sess1 = conn.createXASession();
+      XAResource res1 = sess1.getXAResource();
+
+      //Create a session
+      XASession sess2 = conn.createXASession();
+      XAResource res2 = sess2.getXAResource();
+      
+      assertTrue(res1.isSameRM(res2));
+   }
+   
    public void testOneSessionTwoTransactionsRollbackSend() throws Exception
    {
       XAConnection conn = null;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java	2009-03-11 17:16:35 UTC (rev 6066)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java	2009-03-11 20:01:08 UTC (rev 6067)
@@ -30,8 +30,6 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
-import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
-
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -74,7 +72,7 @@
 
       clearData();
       addressSettings.clear();
-      configuration = createDefaultConfig();
+      configuration = createDefaultConfig(true);
       configuration.setSecurityEnabled(false);
       configuration.setJournalMinFiles(2);
       configuration.setPagingDirectory(getPageDir());
@@ -121,16 +119,35 @@
       super.tearDown();
    }
 
-   public void transactionManagerIntegration() throws Exception
+   public void testIsSameRM() throws Exception
    {
-      TransactionManagerImple tm = new TransactionManagerImple();
-      tm.begin();
+      ClientSessionFactory nettyFactory = createNettyFactory();
+      validateRM(nettyFactory, nettyFactory);
+      validateRM(sessionFactory, sessionFactory);
+      validateRM(nettyFactory, sessionFactory);
+   }
+   
+   private void validateRM(ClientSessionFactory factory1, ClientSessionFactory factory2) throws Exception
+   {
+      ClientSession session1 = factory1.createSession(true, false, false);
+      ClientSession session2 = factory2.createSession(true, false, false);
 
+      if (factory1 == factory2)
+      {
+         assertTrue(session1.isSameRM(session2));
+      }
+      else
+      {
+         assertFalse(session1.isSameRM(session2));
+      }
+
+      session1.close();
+      session2.close();
    }
 
    public void testSendPrepareDoesntRollbackOnClose() throws Exception
    {
-      Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+      Xid xid = newXID();
 
       ClientMessage m1 = createTextMessage(clientSession, "m1");
       ClientMessage m2 = createTextMessage(clientSession, "m2");
@@ -168,7 +185,7 @@
 
    public void testReceivePrepareDoesntRollbackOnClose() throws Exception
    {
-      Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+      Xid xid = newXID();
 
       ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
       ClientProducer clientProducer = clientSession2.createProducer(atestq);
@@ -249,30 +266,61 @@
 
    public void testSendMultipleQueues() throws Exception
    {
-      multipleQueuesInternalTest(false, false);
+      multipleQueuesInternalTest(true, false, false, false);
    }
 
+   public void testSendMultipleQueuesOnePhase() throws Exception
+   {
+      multipleQueuesInternalTest(true, false, false, true);
+      multipleQueuesInternalTest(false, false, true, true);
+   }
+
    public void testSendMultipleQueuesRecreate() throws Exception
    {
-      multipleQueuesInternalTest(false, true);
+      multipleQueuesInternalTest(true, false, true, false);
    }
 
    public void testSendMultipleSuspend() throws Exception
    {
-      multipleQueuesInternalTest(true, false);
+      multipleQueuesInternalTest(true, true, false, false);
    }
 
    public void testSendMultipleSuspendRecreate() throws Exception
    {
-      multipleQueuesInternalTest(true, true);
+      multipleQueuesInternalTest(true, true, true, false);
    }
 
+   public void testSendMultipleSuspendErrorCheck() throws Exception
+   {
+      ClientSession session = null;
+
+      session = sessionFactory.createSession(true, false, false);
+
+      Xid xid = newXID();
+
+      session.start(xid, XAResource.TMNOFLAGS);
+
+      try
+      {
+         session.start(xid, XAResource.TMRESUME);
+         fail("XAException expected");
+      }
+      catch (XAException e)
+      {
+         assertEquals(XAException.XAER_PROTO, e.errorCode);
+      }
+
+      session.close();
+   }
+
    /**
     * @throws MessagingException
     * @throws XAException
     */
-   protected void multipleQueuesInternalTest(boolean suspend, boolean recreateSession) throws MessagingException,
-                                                                                      XAException
+   protected void multipleQueuesInternalTest(boolean createQueues,
+                                             boolean suspend,
+                                             boolean recreateSession,
+                                             boolean onePhase) throws MessagingException, XAException
    {
       int NUMBER_OF_MSGS = 100;
       int NUMBER_OF_QUEUES = 10;
@@ -285,15 +333,18 @@
 
          session = sessionFactory.createSession(true, false, false);
 
-         for (int i = 0; i < NUMBER_OF_QUEUES; i++)
+         if (createQueues)
          {
-            session.createQueue(ADDRESS, ADDRESS.concat(Integer.toString(i)), true);
+            for (int i = 0; i < NUMBER_OF_QUEUES; i++)
+            {
+               session.createQueue(ADDRESS, ADDRESS.concat(Integer.toString(i)), true);
+            }
          }
 
          for (int tr = 0; tr < 2; tr++)
          {
 
-            Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+            Xid xid = newXID();
 
             session.start(xid, XAResource.TMNOFLAGS);
 
@@ -316,7 +367,10 @@
 
             session.end(xid, XAResource.TMSUCCESS);
 
-            session.prepare(xid);
+            if (!onePhase)
+            {
+               session.prepare(xid);
+            }
 
             if (recreateSession)
             {
@@ -330,7 +384,7 @@
             }
             else
             {
-               session.commit(xid, false);
+               session.commit(xid, onePhase);
             }
 
          }
@@ -338,7 +392,7 @@
          for (int i = 0; i < 2; i++)
          {
 
-            Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+            Xid xid = newXID();
 
             session.start(xid, XAResource.TMNOFLAGS);
 
@@ -362,6 +416,7 @@
 
                ClientMessage msg = consumer.receive(1000);
                assertNotNull(msg);
+               msg.acknowledge();
 
                if (suspend)
                {
@@ -406,6 +461,14 @@
       }
    }
 
+   /**
+    * @return
+    */
+   private XidImpl newXID()
+   {
+      return new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+   }
+
    class TxMessageHandler implements MessageHandler
    {
       boolean failedToAck = false;




More information about the jboss-cvs-commits mailing list