[jboss-cvs] JBoss Messaging SVN: r8139 - in branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774: src/main/org/jboss/jms/client/container and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 29 15:12:18 EST 2010


Author: jbertram at redhat.com
Date: 2010-11-29 15:12:16 -0500 (Mon, 29 Nov 2010)
New Revision: 8139

Modified:
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnection.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/ConnectionState.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/wireformat/ConnectionCreateSessionDelegateRequest.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
Log:
SOA-2526

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnection.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnection.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -267,7 +267,7 @@
       }
 
       SessionDelegate sessionDelegate =
-         delegate.createSessionDelegate(transacted, acknowledgeMode, isXA);
+         delegate.createSessionDelegate(transacted, acknowledgeMode, isXA, false);
       return new JBossSession(sessionDelegate, type);
    }
 

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -121,7 +121,7 @@
 
       // Create a consumer. The ClientConsumer knows we are a connection consumer so will
       // not call pre or postDeliver so messages won't be acked, or stored in session/tx.
-      sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
+      sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false, true);
           
       cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true, true);
 

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -121,11 +121,12 @@
       boolean transacted = ((Boolean)mi.getArguments()[0]).booleanValue();
       int ackMode = ((Integer)mi.getArguments()[1]).intValue();
       boolean xa = ((Boolean)mi.getArguments()[2]).booleanValue();
+      boolean isCC = ((Boolean)mi.getArguments()[3]).booleanValue();
 
       SessionState sessionState =
          new SessionState(connectionState, sessionDelegate, transacted,
                           ackMode, xa, sessionDelegate.getDupsOKBatchSize(), 
-                          connectionState.isEnableOrderingGroup(), connectionState.getDefaultOrderingGroupName());
+                          connectionState.isEnableOrderingGroup(), connectionState.getDefaultOrderingGroupName(), isCC);
 
       delegate.setState(sessionState);
       return delegate;

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -181,11 +181,11 @@
 
    public SessionDelegate createSessionDelegate(boolean transacted,
                                                 int acknowledgmentMode,
-                                                boolean isXA) throws JMSException
+                                                boolean isXA, boolean isCC) throws JMSException
    {
       RequestSupport req =
          new ConnectionCreateSessionDelegateRequest(id, version, transacted,
-                                                    acknowledgmentMode, isXA);
+                                                    acknowledgmentMode, isXA, isCC);
 
       return (SessionDelegate)doInvoke(client, req);
    }

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/ConnectionState.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/ConnectionState.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -172,7 +172,7 @@
          ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newDelegate.
             createSessionDelegate(sessionState.isTransacted(),
                                   sessionState.getAcknowledgeMode(),
-                                  sessionState.isXA());
+                                  sessionState.isXA(), sessionState.isCC());
 
          sessionDelegate.synchronizeWith(newSessionDelegate);
       }

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/SessionState.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/client/state/SessionState.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -126,11 +126,13 @@
    
    private String defaultOrderingGroupName;
    
+   private boolean isCC;
+   
    // Constructors ---------------------------------------------------------------------------------
-
+   
    public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
                        boolean transacted, int ackMode, boolean xa,
-                       int dupsOKBatchSize, boolean enableOrderingGroup, String defaultOrderingGroupName)
+                       int dupsOKBatchSize, boolean enableOrderingGroup, String defaultOrderingGroupName, boolean isCC)
    {
       super(parent, (DelegateSupport)delegate);
 
@@ -140,6 +142,7 @@
       this.acknowledgeMode = ackMode;
       this.transacted = transacted;
       this.xa = xa;
+      this.isCC = isCC;
       
       this.dupsOKBatchSize = dupsOKBatchSize;
 
@@ -439,6 +442,11 @@
       return xa;
    }
 
+   public boolean isCC()
+   {
+      return isCC;
+   }
+   
    public MessagingXAResource getXAResource()
    {
       return xaResource;

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/delegate/ConnectionEndpoint.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -41,7 +41,7 @@
 {
    SessionDelegate createSessionDelegate(boolean transacted,
                                          int acknowledgmentMode,
-                                         boolean isXA) throws JMSException;
+                                         boolean isXA, boolean isCC) throws JMSException;
 
    String getClientID() throws JMSException;
 

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -226,14 +226,14 @@
 
    public SessionDelegate createSessionDelegate(boolean transacted,
                                                 int acknowledgmentMode,
-                                                boolean isXA)
+                                                boolean isXA, boolean isCC)
       throws JMSException
    {
       try
       {
          log.trace(this + " creating " + (transacted ? "transacted" : "non transacted") +
             " session, " + Util.acknowledgmentMode(acknowledgmentMode) + ", " +
-            (isXA ? "XA": "non XA"));
+            (isXA ? "XA": "non XA") + ", " + (isCC ? "CC" : "non CC"));
 
          if (closed)
          {
@@ -248,6 +248,11 @@
          //Note we only replicate transacted and client acknowledge sessions.
          ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this,
          		                     transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE);
+         
+         if (isCC)
+         {
+            ep.setCC();
+         }
 
          synchronized (sessions)
          {

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -193,8 +193,11 @@
    private Map<Long, Long> failureCanceledDels;
 
    private AtomicBoolean isSuckerSession = new AtomicBoolean(false);
+   
+   private boolean isCC = false;
+   
+   private boolean markClose = false;
 
-
    // Constructors ---------------------------------------------------------------------------------
 
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
@@ -342,6 +345,30 @@
 
    public void close() throws JMSException
    {
+      if (isCC)
+      {
+         markClose = true;
+         checkClose();
+      }
+      else
+      {
+         trueClose();
+      }
+   }
+   
+   public void checkClose() throws JMSException
+   {
+      if (trace) {log.trace(this + " checking cc closing: " + markClose + " counter: " + deliveries.size()); }
+      
+      if (markClose && deliveries.size() == 0)
+      {
+         if (trace) {log.trace(this + " closing CC session now.");}
+         trueClose();
+      }
+   }
+   
+   public void trueClose() throws JMSException
+   {
       try
       {
          localClose();
@@ -1231,6 +1258,12 @@
 
             DeliveryRecord rec = (DeliveryRecord)entry.getValue();
 
+            // https://jira.jboss.org/browse/JBMESSAGING-1828
+            if (rec == null)
+            {
+               continue;
+            }
+
             //for a suck delivery, we need to update the state back to 'C'
             if (rec.del.isSucked())
             {
@@ -1772,6 +1805,11 @@
 
       //Need to send a message to the replicant to remove the id
       postOffice.sendReplicateAckMessage(rec.queueName, del.getReference().getMessage().getMessageID());
+      
+      if (isCC)
+      {
+         checkClose();
+      }
 
       return rec.del;
    }
@@ -1911,6 +1949,11 @@
       }
 
       if (trace) { log.trace(this + " acknowledged delivery " + ack); }
+      
+      if (isCC)
+      {
+         checkClose();
+      }
 
       return true;
    }
@@ -2514,6 +2557,19 @@
             		throw new TransactionException("Failed to handle send ack", e);
             	}
             }
+
+            if (isCC && del != null)
+            {
+               try
+               {
+                  checkClose();
+               }
+               catch (JMSException e)
+               {
+                  //we don't need to do anything here.
+                  log.warn("Exception closing a CC session " + this);
+               }
+            }
          }
       }
 
@@ -2536,4 +2592,14 @@
       return pm;
    }
 
+   public void setCC()
+   {
+      isCC = true;
+   }
+   
+   public boolean isCC()
+   {
+      return isCC;
+   }
+
 }

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -70,9 +70,9 @@
 
    public SessionDelegate createSessionDelegate(boolean transacted,
                                                 int acknowledgmentMode,
-                                                boolean isXA) throws JMSException
+                                                boolean isXA, boolean isCC) throws JMSException
    {
-      return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA);
+      return endpoint.createSessionDelegate(transacted, acknowledgmentMode, isXA, isCC);
    }
 
    public String getClientID() throws JMSException

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/wireformat/ConnectionCreateSessionDelegateRequest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/wireformat/ConnectionCreateSessionDelegateRequest.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/src/main/org/jboss/jms/wireformat/ConnectionCreateSessionDelegateRequest.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -45,6 +45,8 @@
    
    private boolean xa;
    
+   private boolean isCC;
+   
    public ConnectionCreateSessionDelegateRequest()
    {      
    }
@@ -52,7 +54,7 @@
    public ConnectionCreateSessionDelegateRequest(String objectId,
                                                  byte version,
                                                  boolean transacted, int ackMode,
-                                                 boolean xa)
+                                                 boolean xa, boolean isCC)
    {
       super(objectId, PacketSupport.REQ_CONNECTION_CREATESESSIONDELEGATE, version);
       
@@ -61,6 +63,8 @@
       this.acknowledgmentMode = ackMode;
       
       this.xa = xa;
+      
+      this.isCC = isCC;
    }
 
    public void read(DataInputStream is) throws Exception
@@ -72,6 +76,8 @@
       acknowledgmentMode = is.readInt();
       
       xa = is.readBoolean();
+      
+      isCC = is.readBoolean();
    }
 
    public ResponseSupport serverInvoke() throws Exception
@@ -84,7 +90,7 @@
          throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
       }
       
-      return new ConnectionCreateSessionDelegateResponse((ClientSessionDelegate)endpoint.createSessionDelegate(transacted, acknowledgmentMode, xa));         
+      return new ConnectionCreateSessionDelegateResponse((ClientSessionDelegate)endpoint.createSessionDelegate(transacted, acknowledgmentMode, xa, isCC));         
    }
 
    public void write(DataOutputStream os) throws Exception
@@ -99,6 +105,8 @@
       
       os.writeBoolean(xa);
       
+      os.writeBoolean(isCC);
+      
       os.flush();
    }
 

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -30,9 +30,22 @@
 import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
 
 import org.jboss.jms.client.JBossConnectionConsumer;
 import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
 
 import EDU.oswego.cs.dl.util.concurrent.Latch;
 
@@ -54,7 +67,9 @@
    // Static --------------------------------------------------------
    
    // Attributes ----------------------------------------------------
-
+   private TransactionManager tm;
+   private Transaction suspended;
+   
    // Constructors --------------------------------------------------
 
    public ConnectionConsumerTest(String name)
@@ -63,7 +78,35 @@
    }
 
    // TestCase overrides -------------------------------------------   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   
+      if (!ServerManagement.isRemote())
+      {
+         InitialContext localIc = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
+           
+         tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
+      
+         suspended = tm.suspend();
+      
+         log.debug("setup done");
+      }
+   }
 
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      if (!ServerManagement.isRemote())
+      {
+         if (suspended != null)
+         {
+            tm.resume(suspended);
+         }
+      }
+   }
+
    // Public --------------------------------------------------------
 
    public void testSimple() throws Exception
@@ -134,6 +177,52 @@
    }
 
 
+   public void testWaitForDeliveryFinish() throws Exception
+   {
+      if (ServerManagement.isRemote()) return;
+      
+      XAConnection connConsumer = null;
+      
+      Connection connProducer = null;
+      
+      try
+      {
+         connConsumer = cf.createXAConnection();
+         
+         connConsumer.start();
+                  
+         XASession sessCons = connConsumer.createXASession();
+         
+         TxMessageListener listener = new TxMessageListener();
+         
+         sessCons.setMessageListener(listener);
+         
+         MockServerSessionPool2 pool = new MockServerSessionPool2(sessCons, tm);
+         
+         JBossConnectionConsumer cc = (JBossConnectionConsumer)connConsumer.createConnectionConsumer(queue1, null, pool, 1);         
+         
+         connProducer = cf.createConnection();
+            
+         Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sessProd.createProducer(queue1);
+            
+         TextMessage m1 = sessProd.createTextMessage("a");
+         prod.send(m1);
+            
+         cc.close();
+         pool.shutdown();
+         
+         connProducer.close();
+         connProducer = null;
+         connConsumer.close();
+         connConsumer = null;            
+      }
+      finally 
+      {
+         if (connConsumer != null) connConsumer.close();
+         if (connConsumer != null) connProducer.close();
+      }
+   }
    
    public void testRedeliveryTransacted() throws Exception
    {
@@ -349,6 +438,20 @@
       }
    }
 
+   class TxMessageListener implements MessageListener
+   {
+      
+      public TxMessageListener()
+      {
+      }
+      
+      //need client ack
+      public void onMessage(Message msg)
+      {
+         log.info("Message got: " + msg);
+            
+      }
+   }
 
    class SimpleMessageListener implements MessageListener
    {
@@ -590,3 +693,95 @@
       return serverSession;
    }      
 }
+
+class MockServerSessionPool2 implements ServerSessionPool
+{
+   private MockServerSession2 serverSession;
+   
+   MockServerSessionPool2(XASession sess, TransactionManager tm)
+   {
+      serverSession = new MockServerSession2(sess, tm);
+   }
+
+   public void shutdown()
+   {
+      try
+      {
+         serverSession.join();
+      }
+      catch (InterruptedException e)
+      {
+      }
+   }
+
+   public ServerSession getServerSession() throws JMSException
+   {
+      return serverSession;
+   }      
+}
+
+class MockServerSession2 extends Thread implements ServerSession
+{
+   XASession session;
+   TransactionManager tm;
+   
+   MockServerSession2(XASession sess, TransactionManager tm)
+   {
+      this.session = sess;
+      this.tm = tm;
+   }
+   
+   public Session getSession() throws JMSException
+   {
+      return session;
+   }
+
+   public void run()
+   {
+      try
+      {
+         tm.begin();
+         
+         Transaction tx = tm.getTransaction();
+
+         XAResource res = session.getXAResource();
+         tx.enlistResource(res);
+         
+         session.run();
+         
+         try
+         {
+            Thread.sleep(5000);
+         }
+         catch (InterruptedException e)
+         {
+         }
+
+         tx.delistResource(res, XAResource.TMSUCCESS);
+
+         tm.commit();
+      }
+      catch (NotSupportedException e)
+      {
+      }
+      catch (SystemException e)
+      {
+      }
+      catch (IllegalStateException e)
+      {
+      }
+      catch (RollbackException e)
+      {
+      }
+      catch (SecurityException e)
+      {
+      }
+      catch (HeuristicMixedException e)
+      {
+      }
+      catch (HeuristicRollbackException e)
+      {
+      }
+   }
+   
+}

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -503,7 +503,7 @@
       public void testConnectionCreateSessionDelegateRequest() throws Exception
       {
          RequestSupport req =
-            new ConnectionCreateSessionDelegateRequest("23", (byte)77, true, 23, true);;
+            new ConnectionCreateSessionDelegateRequest("23", (byte)77, true, 23, true, false);
                  
          testPacket(req, PacketSupport.REQ_CONNECTION_CREATESESSIONDELEGATE);                           
       }

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2010-11-26 13:10:51 UTC (rev 8138)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809_JBMESSAGING-1774/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2010-11-29 20:12:16 UTC (rev 8139)
@@ -329,7 +329,7 @@
          return closed;
       }
 
-      public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA) throws JMSException
+      public SessionDelegate createSessionDelegate(boolean transacted, int acknowledgmentMode, boolean isXA, boolean isCC) throws JMSException
       {
          return null;
       }



More information about the jboss-cvs-commits mailing list