[jboss-cvs] JBoss Messaging SVN: r7121 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456: src/etc/remoting and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 28 21:55:49 EDT 2009


Author: jbertram at redhat.com
Date: 2009-05-28 21:55:48 -0400 (Thu, 28 May 2009)
New Revision: 7121

Added:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java
Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
Log:
[JBPAPP-2030] 


Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml	2009-05-29 01:55:48 UTC (rev 7121)
@@ -1748,6 +1748,8 @@
       <attribute name="SlowConsumers">false</attribute>
       
       <attribute name="StrictTck">true</attribute>
+      
+      <attribute name="SendAcksAsync">false</attribute>
 
       <attribute name="DefaultTempQueueFullSize">50000</attribute>
       
@@ -1891,42 +1893,174 @@
             bisocket transport to communicate.</para>
          </section>
       </section>
-      <!-- End conf.connectionfactory.attributes -->
-   </section>
-   <!-- End conf.connectionfactory -->
-   <section id="conf.connector">
-      <title>Configuring the remoting connector</title>
-      <para>JBoss Messaging uses JBoss Remoting for all client to server
-      communication. For full details of what JBoss Remoting is capable of and
-      how it is configured please consult the JBoss Remoting
-      documentation.</para>
-      <para>The default configuration includes a single remoting connector
-      which is used by the single default connection factory. Each connection
-      factory can be configured to use its own connector.</para>
-      <para>The default connector is configured to use the remoting bisocket
-      transport. The bisocket transport is a TCP socket based transport which
-      only listens and accepts connections on the server side. I.e.
-      connections are always initiated from the client side. This means it
-      works well in typical firewall scenarios where only inbound connections
-      are allowed on the server. Or where onlu outbound connections are
-      allowed from the client.</para>
-      <para>The bisocket transport can be configured to use SSL where a higher
-      level of security is required.</para>
-      <para>The other supported transport is the HTTP transport. This uses the
-      HTTP protocol to communicate between client and server. Data is received
-      on the client by the client periodically polling the server for
-      messages. This transport is well suited to situations where there is a
-      firewall between client and server which only allows incoming HTTP
-      traffic on the server. Please note this transport will not be as
-      performant as the bisocket transport due to the nature of polling and
-      the HTTP protocl. Also please note it is not designed for high load
-      situations.</para>
-      <para>No other remoting transports are currently supported by JBoss
-      Messaging</para>
-      <para>You can look at remoting configuration under:</para>
-      <para>&lt;JBoss&gt;/server/&lt;YourMessagingServer&gt;/deploy/jboss-messaging.sar/remoting-bisocket-service.xml</para>
-      <para>Here is an example bisocket remoting configuration:
-      <programlisting>
+      <section id="conf.connectionfactory.attributes.tckstrictbehavior">
+        <title>StrictTck</title>
+
+        <para>Set this to true if you want strict JMS behaviour as required by
+        the TCK.</para>
+      </section>
+      
+      <section id="conf.connectionfactory.attributes.sendacksasync">
+	<title>SendAcksAsync</title>
+	      
+	<para>Set this to true if you want acknowledgements to be sent asynchronously. This can speed up performance especially if you are using auto_acknowledge mode</para>
+      </section>
+
+      <section id="conf.connectionfactory.attributes.tempqueuepaging">
+        <title>Temporary queue paging parameters</title>
+
+        <para>DefaultTempQueueFullSize, DefaultTempQueuePageSize,
+        DefaultTempQueueDownCacheSize are optional attributes that determine
+        the default paging parameters to be used for any temporary
+        destinations scoped to connections created using this connection
+        factory. See the section on paging channels for more information on
+        what these values mean. They will default to values of 200000, 2000
+        and 2000 respectively if ommitted.</para>
+      </section>
+
+      <section id="conf.connectionfactory.attributes.dupsokbatchsize">
+        <title>DupsOKBatchSize</title>
+
+        <para>When using a session with acknowledge mode of
+        DUPS_OK_ACKNOWLEDGE this setting determines how many acknowledgments
+        it will buffer locally before sending. The default value is
+        <literal>2000</literal></para>
+      </section>
+
+      <section id="conf.connectionfactory.attributes.supportsloadbalancing">
+        <title>SupportsLoadBalancing</title>
+
+        <para>When using a connection factory with a clustered JBoss Messaging
+        installation you can choose whether to enable client side connection
+        load-balancing. This is determined by setting the attribute
+        supportsLoadBalancing on the connection factory.</para>
+
+        <para>If load balancing is enabled on a connection factory then any
+        connections created with that connection factory will be load-balanced
+        across the nodes of the cluster. Once a connection is created on a
+        particular node, it stays on that node.</para>
+
+        <para>The exact policy that determines how connections are load
+        balanced is determined by the LoadBalancingFactory attribute</para>
+
+        <para>The default value is <literal>false</literal></para>
+      </section>
+
+      <section id="conf.connectionfactory.attributes.supportsfailover">
+        <title>SupportsFailover</title>
+
+        <para>When using a connection factory with a clustered JBoss Messaging
+        installation you can choose whether to enable client side automatic
+        failover. This is determined by setting the attribute supportsFailover
+        on the connection factory.</para>
+
+        <para>If automatic failover is enabled on a connection factory, then
+        if a connection problem is detected with the connection then JBoss
+        Messaging will automatically and transparently failover to another
+        node in the cluster.</para>
+
+        <para>The failover is transparent meaning the user can carry on using
+        the sessions, consumers, producers and connection objects as
+        before.</para>
+
+        <para>If automatic failover is not required, then this attribute can
+        be set to false. With automatic failover disabled it is up to the user
+        code to catch connection exceptions in synchronous JMS operations and
+        install a JMS ExceptionListener to catch exceptions asynchronously.
+        When a connection is caught, the client side code should lookup a new
+        connection factory using HAJNDI and recreate the connection using
+        that.</para>
+
+        <para>The default value is <literal>false</literal></para>
+      </section>
+
+      <section id="conf.connectionfactory.attributes.disableremotingchecks">
+        <title>DisableRemotingChecks</title>
+
+        <para>By default, when deploying a connection factory, JBoss Messaging
+        checks that the corresponding JBoss Remoting Connector has "sensible"
+        values. JBoss Messaging is very sensitive to the values and for many
+        of them there's rarely a good reason to change them. To disable such
+        sanity checking set this to false. <warning>
+             There is rarely a good reason to disable checking. Only do so if you are absolutely sure in what you are doing 
+          </warning></para>
+
+        <para>The default value is <literal>false</literal></para>
+      </section>
+
+      <section id="conf.connectionfactory.attributes.loadbalancingfactory">
+        <title>LoadBalancingFactory</title>
+
+        <para>If you are using a connection factory with client side load
+        balancing then you can specify how the load balancing is implemented
+        by overriding this attribute. The value must correspond to the name of
+        a class which implements the interface
+        org.jboss.jms.client.plugin.LoadBalancingFactory</para>
+
+        <para>The default value is
+        org.jboss.jms.client.plugin.RoundRobinLoadBalancingFactory, which load
+        balances connetions across the cluster in a round-robin fashion</para>
+      </section>
+
+      <section id="conf.connectionfactory.attributes.connector">
+        <title>Connector</title>
+
+        <para>This specifies which remoting connector this connection factory
+        uses. Different connection factories can use different
+        connectors.</para>
+
+        <para>For instance you could deploy one connection factory that
+        creates connections that use the HTTP transport to communicate to the
+        server and another that creates connections that use the bisocket
+        transport to communicate.</para>
+      </section>
+    </section>
+
+    <!-- End conf.connectionfactory.attributes -->
+  </section>
+
+  <!-- End conf.connectionfactory -->
+
+  <section id="conf.connector">
+    <title>Configuring the remoting connector</title>
+
+    <para>JBoss Messaging uses JBoss Remoting for all client to server
+    communication. For full details of what JBoss Remoting is capable of and
+    how it is configured please consult the JBoss Remoting
+    documentation.</para>
+
+    <para>The default configuration includes a single remoting connector which
+    is used by the single default connection factory. Each connection factory
+    can be configured to use its own connector.</para>
+
+    <para>The default connector is configured to use the remoting bisocket
+    transport. The bisocket transport is a TCP socket based transport which
+    only listens and accepts connections on the server side. I.e. connections
+    are always initiated from the client side. This means it works well in
+    typical firewall scenarios where only inbound connections are allowed on
+    the server. Or where onlu outbound connections are allowed from the
+    client.</para>
+
+    <para>The bisocket transport can be configured to use SSL where a higher
+    level of security is required.</para>
+
+    <para>The other supported transport is the HTTP transport. This uses the
+    HTTP protocol to communicate between client and server. Data is received
+    on the client by the client periodically polling the server for messages.
+    This transport is well suited to situations where there is a firewall
+    between client and server which only allows incoming HTTP traffic on the
+    server. Please note this transport will not be as performant as the
+    bisocket transport due to the nature of polling and the HTTP protocl. Also
+    please note it is not designed for high load situations.</para>
+
+    <para>No other remoting transports are currently supported by JBoss
+    Messaging</para>
+
+    <para>You can look at remoting configuration under:</para>
+
+    <para>&lt;JBoss&gt;/server/&lt;YourMessagingServer&gt;/deploy/jboss-messaging.sar/remoting-bisocket-service.xml</para>
+
+    <para>Here is an example bisocket remoting configuration: <programlisting>
          &lt;config&gt;
             &lt;invoker transport="bisocket"&gt;
             
@@ -2016,4 +2150,4 @@
       the service binding manager for JBoss Messaging</para>
    </section>
    <!-- End conf.callback -->
-</chapter>
\ No newline at end of file
+</chapter>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml	2009-05-29 01:55:48 UTC (rev 7121)
@@ -38,7 +38,10 @@
                <attribute name="stopLeaseOnFailure" isParam="true">true</attribute>
                
                <!-- Periodicity of client pings. Server window by default is twice this figure -->                               
-               <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+               <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+               
+               <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+               <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
 	       	       
 	       <attribute name="timeout" isParam="true">0</attribute>
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml	2009-05-29 01:55:48 UTC (rev 7121)
@@ -36,7 +36,10 @@
                <attribute name="stopLeaseOnFailure" isParam="true">true</attribute>
                               
                <!-- Periodicity of client pings. Server window by default is twice this figure -->                               
-               <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+               <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+               
+               <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+               <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
 	       
 	       <attribute name="timeout" isParam="true">0</attribute>
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml	2009-05-29 01:55:48 UTC (rev 7121)
@@ -120,6 +120,10 @@
 
       <attribute name="StrictTck">true</attribute>
       
+      <!- - Should acknowledgements be sent asynchronously? - ->
+      
+      <attribute name="SendAcksAsync">false</attribute>
+      
       <!- - Disable JBoss Remoting Connector sanity checks - There is rarely a good reason to set this to true - ->
       
       <attribute name="DisableRemotingChecks">false</attribute>
@@ -146,4 +150,4 @@
    
    -->
 
-</server>
\ No newline at end of file
+</server>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml	2009-05-29 01:55:48 UTC (rev 7121)
@@ -118,6 +118,12 @@
       <type>boolean</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="isSendAcksAsync" setMethod="setSendAcksAsync">
+      <description>Should acknowledgements be sent asynchronously?</description>
+      <name>SendAcksAsync</name>
+      <type>boolean</type>
+   </attribute>
+   
    <attribute access="read-write" getMethod="isDisableRemotingChecks" setMethod="setDisableRemotingChecks">
       <description>Disable remoting connector parameter sanity checks. There is rarely a good reason to set this to true</description>
       <name>DisableRemotingChecks</name>
@@ -146,4 +152,4 @@
       <name>destroy</name>
    </operation>
 
-</mbean>
\ No newline at end of file
+</mbean>

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -31,7 +31,6 @@
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.BrowserState;
 import org.jboss.jms.client.state.ConnectionState;
@@ -92,14 +91,6 @@
          Version versionToUse = connectionDelegate.getVersionToUse();
          JMSRemotingConnection remotingConnection = connectionDelegate.getRemotingConnection();
 
-         // install the consolidated remoting connection listener; it will be de-installed on
-         // connection closing by ConnectionAspect
-
-         ConsolidatedRemotingConnectionListener listener =
-            new ConsolidatedRemotingConnectionListener();
-
-         remotingConnection.addConnectionListener(listener);
-
          if (versionToUse == null)
          {
             throw new IllegalStateException("Connection version is null");
@@ -109,7 +100,8 @@
             new ConnectionState(serverID, connectionDelegate,
                                 remotingConnection, versionToUse);
 
-         listener.setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().setConnectionState(connectionState);
+         remotingConnection.getConnectionListener().start();
           
          connectionDelegate.setState(connectionState);
       }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -95,7 +95,7 @@
          if (trace) log.trace("Trying communication on server(" + server + ")=" + delegates[server].getServerLocatorURI());
          try
          {
-            remoting = new JMSRemotingConnection(delegates[server].getServerLocatorURI(), true, delegates[server].getStrictTck());
+            remoting = new JMSRemotingConnection(delegates[server].getServerLocatorURI(), true, delegates[server].getStrictTck(), delegates[server].isSendAcksAsync());
             remoting.start();
             currentDelegate = delegates[server];
             if (trace) log.trace("Adding callback");

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -30,6 +30,7 @@
 import javax.jms.JMSException;
 
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.CreateConnectionResult;
@@ -77,6 +78,8 @@
 
    private boolean strictTck;
    
+   private boolean sendAcksAsync;
+   
    // Static ---------------------------------------------------------------------------------------
    
    /*
@@ -106,7 +109,8 @@
    // Constructors ---------------------------------------------------------------------------------
 
    public ClientConnectionFactoryDelegate(String uniqueName, String objectID, int serverID, String serverLocatorURI,
-                                          Version serverVersion, boolean clientPing, boolean strictTck)
+                                          Version serverVersion, boolean clientPing, boolean strictTck,
+                                          boolean sendAcksAsync)
    {
       super(objectID);
 
@@ -116,6 +120,7 @@
       this.serverVersion = serverVersion;
       this.clientPing = clientPing;
       this.strictTck = strictTck;
+      this.sendAcksAsync = sendAcksAsync;
    }
    
    public ClientConnectionFactoryDelegate()
@@ -149,7 +154,7 @@
       
       try
       {         
-         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
+         remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck, new ConsolidatedRemotingConnectionListener(), sendAcksAsync);
          
          remotingConnection.start();
    
@@ -271,8 +276,13 @@
        return strictTck;
    }
 
-    public void synchronizeWith(DelegateSupport newDelegate) throws Exception
+   public boolean isSendAcksAsync()
    {
+      return this.sendAcksAsync;
+   }
+   
+   public void synchronizeWith(DelegateSupport newDelegate) throws Exception
+   {
       super.synchronizeWith(newDelegate);
    }
 
@@ -327,6 +337,8 @@
       clientPing = in.readBoolean();
 
       strictTck = in.readBoolean();
+      
+      sendAcksAsync = in.readBoolean();
    }
 
    public void write(DataOutputStream out) throws Exception
@@ -342,6 +354,8 @@
       out.writeBoolean(clientPing);
 
       out.writeBoolean(strictTck);
+      
+      out.writeBoolean(sendAcksAsync);
    }
 
    // Inner Classes --------------------------------------------------------------------------------

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -93,6 +93,8 @@
    private int dupsOKBatchSize;
    
    private boolean strictTck;
+   
+   private boolean sendAcksAsync;
 
    // Static ---------------------------------------------------------------------------------------
 
@@ -132,6 +134,8 @@
       onewayClient = conn.getOnewayClient();
       
       strictTck = conn.isStrictTck();
+      
+      sendAcksAsync = conn.isSendAcksAsync();
    }
 
    public void setState(HierarchicalState state)
@@ -145,6 +149,8 @@
       onewayClient = conn.getOnewayClient();
       
       strictTck = conn.isStrictTck();
+      
+      sendAcksAsync = conn.isSendAcksAsync();
    }
 
    // Closeable implementation ---------------------------------------------------------------------
@@ -169,14 +175,30 @@
    {
       RequestSupport req = new SessionAcknowledgeDeliveryRequest(id, version, ack);
 
-      return ((Boolean)doInvoke(client, req)).booleanValue();
+      if (sendAcksAsync)
+      {
+         doInvokeOneway(onewayClient, req);
+         
+         return true;
+      }
+      else
+      {
+         return ((Boolean)doInvoke(client, req)).booleanValue();
+      }
    }
 
    public void acknowledgeDeliveries(List acks) throws JMSException
    {
       RequestSupport req = new SessionAcknowledgeDeliveriesRequest(id, version, acks);
-
-      doInvoke(client, req);
+      
+      if (sendAcksAsync)
+      {
+         doInvokeOneway(onewayClient, req);
+      }
+      else
+      {
+         doInvoke(client, req);
+      }
    }
 
    /**

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -42,6 +42,8 @@
    private ExceptionListener jmsExceptionListener;
 
    private ConnectionFailureListener remotingListener;
+      
+   private boolean started;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -53,6 +55,11 @@
 
    public void handleConnectionException(Throwable throwable, Client client)
    {
+      if (!started)
+      {
+         return;
+      }
+            
       // forward the exception to delegate listener and JMS ExceptionListeners; synchronize
       // to avoid race conditions
 
@@ -162,6 +169,11 @@
       }
       return state + ".ConsolidatedListener";
    }
+      
+   public void start()
+   {
+      started = true;
+   }
 
    // Package protected ----------------------------------------------------------------------------
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -244,6 +244,7 @@
    private InvokerLocator serverLocator;
    private CallbackManager callbackManager;
    private boolean strictTck;
+   private boolean sendAcksAsync;
 
    // When a failover is performed, this flag is set to true
    protected boolean failed = false;
@@ -254,12 +255,20 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck) throws Exception
+   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck,
+                                boolean sendAcksAsync) throws Exception
    {
+      this(serverLocatorURI, clientPing, strictTck, null, sendAcksAsync);
+   }
+      
+   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck, ConsolidatedRemotingConnectionListener listener, boolean sendAcksAsync) throws Exception
+   {
       serverLocator = new InvokerLocator(serverLocatorURI);
       this.clientPing = clientPing;
       this.strictTck = strictTck;
-
+      this.sendAcksAsync = sendAcksAsync;
+      this.remotingConnectionListener = listener;
+      
       log.trace(this + " created");
    }
 
@@ -316,7 +325,14 @@
       {
          public Object run() throws Exception
          {
-            client.connect();
+            if (remotingConnectionListener != null)
+            {
+               client.connect(remotingConnectionListener, serverLocator.getParameters());
+            }
+            else
+            {
+               client.connect();
+            }
             onewayClient.connect();
             return null;
          }
@@ -345,7 +361,7 @@
    public void stop()
    {
       log.trace(this + " stop");
-
+      
       // explicitly remove the callback listener, to avoid race conditions on server
       // (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
 
@@ -408,6 +424,11 @@
    {
        return strictTck;
    }
+   
+   public boolean isSendAcksAsync()
+   {
+      return sendAcksAsync;
+   }
 
    public synchronized boolean isFailed()
    {
@@ -421,6 +442,11 @@
    public synchronized void setFailed()
    {
       failed = true;
+      
+      if (client == null) 
+      {
+         return;
+      }
 
       // Remoting has the bad habit of letting the job of cleaning after a failed connection up to
       // the application. Here, we take care of that, by disconnecting the remoting client, and
@@ -429,7 +455,7 @@
 
       try
       {
-      	client.setDisconnectTimeout(0);
+         client.setDisconnectTimeout(0);
       }
       catch (Throwable ignore)
       {      	
@@ -465,7 +491,7 @@
       return true;
    }
 
-   public synchronized void addPlainConnectionListener(ConnectionListener listener)
+   public synchronized void addPlainConnectionListener(final ConnectionListener listener)
    {
       client.addConnectionListener(listener);
    }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -49,7 +49,8 @@
                                  boolean supportsFailover,
                                  boolean supportsLoadBalancing,
                                  LoadBalancingFactory loadBalancingPolicy,
-                                 boolean strictTck) throws Exception;
+                                 boolean strictTck,
+                                 boolean sendAcksAsync) throws Exception;
 
    void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing) throws Exception;
 }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -1178,7 +1178,10 @@
    {
       if (sessions.remove(id) == null)
       {
-         throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+         //here we don't throw exception as the session may have been removed already due to server side
+         //failure handler (SimpleConnectionManager), which deemed to be normal behavior in application environment.
+         if (log.isTraceEnabled()) { log.trace("Cannot find session with id " + id + " to remove"); }
+         // throw new IllegalStateException("Cannot find session with id " + id + " to remove");
       }
    }
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -78,6 +78,8 @@
 
    private boolean strictTck;
    
+   private boolean sendAcksAsync;
+   
    private boolean disableRemotingChecks;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -202,7 +204,7 @@
                                       locatorURI, enablePing, prefetchSize, slowConsumers,
                                       defaultTempQueueFullSize, defaultTempQueuePageSize,                                      
                                       defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
-                                      loadBalancingFactory, strictTck);               
+                                      loadBalancingFactory, strictTck, sendAcksAsync);               
          
          String info = "Connector " + locator.getProtocol() + "://" +
             locator.getHost() + ":" + locator.getPort();
@@ -429,6 +431,22 @@
    	this.strictTck = strictTck;
    }
    
+   public boolean isSendAcksAsync()
+   {
+      return this.sendAcksAsync;
+   }
+
+   public void setSendAcksAsync(boolean async)
+   {
+      if (started)
+      {
+         log.warn("SendAcksAsync can only be changed when connection factory is stopped");
+         return;         
+      }
+
+      this.sendAcksAsync = async;
+   }
+   
    public boolean isDisableRemotingChecks()
    {
    	return disableRemotingChecks;

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -121,7 +121,8 @@
                                                       boolean supportsFailover,
                                                       boolean supportsLoadBalancing,
                                                       LoadBalancingFactory loadBalancingFactory,
-                                                      boolean strictTck)
+                                                      boolean strictTck,
+                                                      boolean sendAcksAsync)
       throws Exception
    {
       log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
@@ -175,7 +176,8 @@
 
       ClientConnectionFactoryDelegate localDelegate =
          new ClientConnectionFactoryDelegate(uniqueName, id, serverPeer.getServerPeerID(),
-                                             locatorURI, version, clientPing, useStrict);
+                                             locatorURI, version, clientPing, useStrict,
+                                             sendAcksAsync);
 
       log.debug(this + " created local delegate " + localDelegate);
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -29,8 +29,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.JMSException;
-
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
@@ -167,6 +165,7 @@
          
          return e;
       }
+      
       return null;
    }
    
@@ -221,17 +220,20 @@
    {  
       if (t instanceof ClientDisconnectedException)
       {
-         // This is OK
-         if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
-         return;
+         if (log.isTraceEnabled())
+         {
+            log.trace("Connection is closed normally: " + client);
+         }
       }
       else
       {
-         if (trace) { log.trace(this + " detected failure on client " + client, t); }
+         if (log.isTraceEnabled())
+         {
+            log.trace("Connection is closed on failure event: " + client);
+         }
       }
+      String remotingSessionID = client.getSessionId();
 
-      String remotingSessionID = client.getSessionId();
-      
       if (remotingSessionID != null)
       {
          handleClientFailure(remotingSessionID);
@@ -401,7 +403,7 @@
 
             try
             {
-               ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+               ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
             }
             catch (Throwable ignore)
             {
@@ -421,10 +423,10 @@
    {      
    	String jmsClientID = remotingSessions.get(jmsSessionID);
    	
-      log.warn("A problem has been detected " +
-               "with the connection to remote client " +
-               jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
-            "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+      log.trace("A problem has been detected " +
+                    "with the connection to remote client " +
+                     jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+                  "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
    	
    	if (jmsClientID != null)
    	{      	
@@ -467,7 +469,6 @@
                catch (Throwable ignore)
                {            	
                }
-               
                return;
          	}
          }
@@ -493,7 +494,7 @@
    
                try
                {
-                  ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+                  ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
                }
                catch (Throwable ignore)
                {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -95,7 +95,7 @@
    // Attributes -----------------------------------------------------------------------------------
 
    private String id;
-
+   
    private volatile boolean closed;
    private volatile boolean started;
 
@@ -366,6 +366,13 @@
    {
       try
       {
+         //reason for synchronization
+         //Sometimes the server side detects a connection failure but 
+         //client side is normal. So it's possible the client side is calling
+         //connection.close() while in the mean time the server side connection
+         //failure handler call it also.
+         synchronized (this)
+         {
          if (trace) { log.trace(this + " close()"); }
 
          if (closed)
@@ -437,11 +444,15 @@
             temporaryDestinations.clear();
          }
 
+         closed = true;
+      }
+      
+         //we put this outside the sync loop to avoid dead lock where
+         //SimpleConnectionManager.handleClientFailure() holds itself and then tries to call this close(), which requires lock on this
+         //meanwhile this close() (called from client) holds itself and call unregisterConnection(), which requires lock on SimpleConnectionManager.
          cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
 
          Dispatcher.instance.unregisterTarget(id, this);
-
-         closed = true;
       }
       catch (Throwable t)
       {
@@ -649,7 +660,10 @@
       {
          if (sessions.remove(sessionId) == null)
          {
-            throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+            //Here not to throw exception, because it is possible that the session close can be 
+            //called from server side (SimpleConnectionManager) before client side.
+            if (trace) { log.trace("Cannot find session with id " + sessionId + " to remove"); }
+            //throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
          }
       }
    }
@@ -752,7 +766,7 @@
       else if (dest.isQueue())
       {
          if (trace) { log.trace(this + " routing " + msg + " to queue"); }
-        if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
+         if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
          {
             throw new JMSException("Failed to route " + ref + " to " + dest.getName());
          }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -1119,16 +1119,22 @@
       {
          if (consumers.remove(consumerId) == null)
          {
-            throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+            if (trace) { log.trace("Cannot find consumer with id " + consumerId + " to remove"); }
+            //don't throw, as it maybe called twice from client and server's connection failure handler.
+            //throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
          }
       }
    }
 
    void localClose() throws Throwable
    {
+           
       if (closed)
       {
-         throw new IllegalStateException("Session is already closed");
+         //don't throw the exception as it maybe called twice
+         if (trace) { log.trace("Session is already closed. "); }
+         return;
+         //throw new IllegalStateException("Session is already closed");
       }
 
       if (trace) log.trace(this + " close()");
@@ -1167,10 +1173,12 @@
       //Note we don't maintain order using a LinkedHashMap since then we lose
       //concurrency since we would have to lock it exclusively
 
-      List entries = new ArrayList(deliveries.entrySet());
+      synchronized (deliveries)
+      {
+         List entries = new ArrayList(deliveries.entrySet());
 
-      //Sort them in reverse delivery id order
-      Collections.sort(entries,
+         //Sort them in reverse delivery id order
+         Collections.sort(entries,
                        new Comparator()
                        {
                            public int compare(Object obj1, Object obj2)
@@ -1183,39 +1191,46 @@
                            }
                        });
 
-      Iterator iter = entries.iterator();
+         Iterator iter = entries.iterator();
 
-      Set channels = new HashSet();
+         Set channels = new HashSet();
 
-      if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
+         if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
 
-      while (iter.hasNext())
-      {
-         Map.Entry entry = (Map.Entry)iter.next();
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
 
-         if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
+            if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
 
-         DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+            DeliveryRecord rec = (DeliveryRecord)entry.getValue();
 
-         rec.del.cancel();
+            /*
+             * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+             */
+            if (!rec.del.isXAPrepared())
+            {
+               rec.del.cancel();
+            }
 
-         channels.add(rec.del.getObserver());
-      }
+            channels.add(rec.del.getObserver());
+         }
 
-      promptDelivery(channels);
+         promptDelivery(channels);
 
-      //Close down the executor
+         //Close down the executor
 
-      //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
-      //prompter is queued and starts to execute
-      //prompter almost finishes executing then a message is cancelled due to this session closing
-      //this causes another prompter to be queued
-      //shutdownAfterProcessingCurrentTask is then called
-      //this means the second prompter never runs and the cancelled message doesn't get redelivered
-      executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+         //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
+         //prompter is queued and starts to execute
+         //prompter almost finishes executing then a message is cancelled due to this session closing
+         //this causes another prompter to be queued
+         //shutdownAfterProcessingCurrentTask is then called
+         //this means the second prompter never runs and the cancelled message doesn't get redelivered
+         executor.shutdownAfterProcessingCurrentlyQueuedTasks();
 
-      deliveries.clear();
-
+         deliveries.clear();
+      }
+      
       sp.removeSession(id);
 
       Dispatcher.instance.unregisterTarget(id, this);
@@ -1225,7 +1240,11 @@
 
    void cancelDelivery(long deliveryId) throws Throwable
    {
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+      DeliveryRecord rec = null;
+      synchronized(deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+      }
 
       if (rec == null)
       {
@@ -1436,7 +1455,7 @@
       {
          // one way invocation, no acknowledgment sent back by the client
          if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-
+         
          callbackHandler.handleCallbackOneway(callback);
 
          //We store the delivery id so we know to wait for any deliveries in transit on close
@@ -1569,7 +1588,11 @@
 
    private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
    {
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+      DeliveryRecord rec = null;
+      synchronized (deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+      }
 
       if (rec == null)
       {
@@ -1717,7 +1740,19 @@
    {
       if (trace) { log.trace(this + " acknowledging delivery " + ack); }
 
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+      DeliveryRecord rec = null;
+      
+      //I put synchronized here to prevent the following from happening:
+      //a clustered server node detects connection failure and cancel deliveries.
+      //but the consumer on it get through to here
+      //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
+      //so the cancel action makes the message back to queue and this method cause the delivery count to decrement.
+      //as the cancel will decrease the delivery count once, so this will result the delivery count being decremented twice
+      //for one same message.
+      synchronized (deliveries)
+      {
+         rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+      }
 
       if (rec == null)
       {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -48,4 +48,9 @@
    void cancel() throws Throwable;   
    
    boolean isRecovered();
+
+   /**
+    * Mark if this delivery is with a prepared XA transaction.
+    */
+   boolean isXAPrepared();
 }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -283,6 +283,13 @@
       acknowledgeInternal(d, null, false);
    }
 
+   /*
+    * Note: If a XA tx is not committed while failure happens, cancelling of the 
+    * delivery shouldn't put the message with transactions back to re-deliver.
+    * It must be there in DB until the transaction recovery happens.
+    * 
+    * @see org.jboss.messaging.core.contract.DeliveryObserver#cancel(org.jboss.messaging.core.contract.Delivery)
+    */
    public void cancel(Delivery del) throws Throwable
    {
       //We may need to update the delivery count in the database

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -50,6 +50,7 @@
    private DeliveryObserver observer;
    private MessageReference reference;   
    private boolean recovered;
+   private Transaction tx;
 
    private boolean trace = log.isTraceEnabled();
 
@@ -73,6 +74,7 @@
       this.observer = observer;
       this.selectorAccepted = selectorAccepted;
       this.recovered = recovered;
+      this.tx = null;
    }
 
    // Delivery implementation ----------------------------------------------------------------------
@@ -95,6 +97,8 @@
    public void acknowledge(Transaction tx) throws Throwable
    {        
       if (trace) { log.trace(this + " acknowledging delivery " + ( tx == null ? "non-transactionally" : "in " + tx)); }
+
+      this.tx = tx;
       
       observer.acknowledge(this, tx);
    }
@@ -118,6 +122,20 @@
       return "Delivery" + (reference == null ? "" : "[" + reference + "]");
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.contract.Delivery#isXA()
+    */
+   public boolean isXAPrepared()
+   {
+      if (tx != null) {
+         if (tx.getXid() != null)
+         {
+            return tx.getState() == Transaction.STATE_PREPARED;
+         }
+      }
+      return false;
+   }
+
    // Package protected ----------------------------------------------------------------------------
    
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -32,12 +32,12 @@
  * A NamedThreadQueuedExecutor
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * @deprecated
  *
  */
 public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
-	private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-	  	  
+{	  	  
 	private final String name;
 	
 	private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
@@ -49,10 +49,6 @@
 		this.name = name;
 		
 		setThreadFactory(new Factory());
-		
-		clearThread();
-		
-		restart();
 	}
 	
 	private class Factory implements ThreadFactory

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (from rev 6192, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -0,0 +1,291 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.remoting.Client;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.Command;
+import org.jboss.test.messaging.tools.container.Server;
+
+/**
+ * A DeliveryOnConnectionFailureTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Mar 26, 2009 3:14:28 PM
+ *
+ */
+public class DeliveryOnConnectionFailureTest extends JMSTestCase
+{
+
+   public DeliveryOnConnectionFailureTest(String name)
+   {
+      super(name);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+   //Message Stuck means messages are kept in delivering state and never be delivered again
+   //unless the server is restarted (for persistent messages).
+   //this can happen in the following conditions:
+   //1. The client ping timeout and JBM tries to disconnect from the server (this happens in cluster).
+   //2. Due to the network/remoting issue, the server will receive a 'normal' disconnection event
+   //3. The server assumes the client has already closed it's connection and therefore doesn't clean up
+   //4. So the connection at the server is left open, including the sessions created on that connection.
+   //5. If the sessions contains messages in delivering, those messages will appear stuck.
+   //To fix this, either the server side always tries to clean up the connection whenever a disconnection happens
+   //or the remoting layer handle this correctly.
+   //
+   //Here we simplify the situation. First start the server and get a connection to it. Then
+   //we send a message to the server with client ack. We receive it without ack, 
+   //next we directly call the client.disconnect() from client without closing the connection
+   //the server should cancel the message. Then we receive the message and ack it.
+   public void testMessageStuckOnConnectionFailure() throws Exception
+   {
+      ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+      
+      JBossConnection conn1 = null;
+      JBossConnection conn2 = null;
+
+      try
+      {
+         //create a connection
+         conn1 = (JBossConnection)cf.createConnection();
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         TextMessage msg = sess1.createTextMessage("dont-stuck-me!");
+         conn1.start();
+         
+         //send a message
+         prod1.send(msg);
+         
+         //receive the message but not ack
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         TextMessage rm = (TextMessage)cons1.receive(2000);
+         
+         assertNotNull(rm);
+         assertEquals("dont-stuck-me!", rm.getText());
+         
+         //break connection.
+         JMSRemotingConnection jmsConn = ((ClientConnectionDelegate)conn1.getDelegate()).getRemotingConnection();
+         Client rmClient = jmsConn.getRemotingClient();
+         rmClient.disconnect();
+         
+         //wait for server side cleanup
+         try
+         {
+            Thread.sleep(5000);
+         }
+         catch (InterruptedException e)
+         {
+            //ignore.
+         }
+         
+         //now receive the message 
+         conn2 = (JBossConnection)cf.createConnection();
+         conn2.start();
+         Session sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         TextMessage rm2 = (TextMessage)cons2.receive(2000);
+
+         assertNotNull(rm2);
+         assertEquals("dont-stuck-me!", rm2.getText());
+         rm2.acknowledge();
+         
+         //Message count should be zero.
+         //this is checked in tearDown().
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+
+   }
+   
+   //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+   //another issue with jira 1456 is negative message count.
+   //This test guarantees the message count won't go negative
+   //Error Scenario:
+   // 1. Server side detects the connection failure (lease timeout) and close the connection/session
+   // 2. The session endpoint will cancel the messages being delivered to the queue.
+   // 3. At the same time the client side received some of the messages and acknowledge them
+   // 4. The acknowledge action will decrease the delivering count of the queue, and the session endpoint
+   //    cancel also decrease the delivering count.
+   // 5. If not synchronized, one message may be canceled and acked at the same time, so the delivering count
+   //    will be decreased twice for each message, resulting in negative message count.
+   //
+   //The test first creates a connection and send messages, then it receives the messages, then ack the last
+   //msg (client-ack), at the same time, simulate the server side connection failure to trigger server side
+   //clean up. This will create a possibility that when not properly synchronized, the above 
+   //described issue may happen. At the end check the message count, it should always be zero.
+   public void testMessageCountOnConnectionFailure() throws Exception
+   {
+      ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+      
+      JBossConnection conn1 = null;
+      JBossConnection conn2 = null;
+      
+      try
+      {
+         conn1 = (JBossConnection)cf.createConnection();
+         conn1.start();
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         
+         //now send messages
+         MessageProducer prod1 = sess1.createProducer(queue1);
+         
+         final int NUM_MSG = 2000;
+         for (int i = 0; i < NUM_MSG; ++i)
+         {
+            TextMessage tm = sess1.createTextMessage("-m"+i);
+            prod1.send(tm);
+         }
+         
+         //receive the messages
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         for (int j = 0; j < NUM_MSG-1; ++j)
+         {
+            TextMessage rm = (TextMessage)cons1.receive(2000);
+            assertNotNull(rm);
+            assertEquals("-m"+j, rm.getText());
+         }
+         
+         //last message
+         TextMessage lastRm = (TextMessage)cons1.receive(2000);
+         assertNotNull(lastRm);
+         assertEquals("-m"+(NUM_MSG-1), lastRm.getText());
+         
+         final ServerClientFailureCommand cmd = new ServerClientFailureCommand();
+         
+         Thread exeThr = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  ServerManagement.getServer().executeCommand(cmd);
+               }
+               catch (Exception e)
+               {
+                  log.error("failed to invoke command", e);
+                  fail("failure in executing command.");
+               }               
+            }
+         };
+         
+         exeThr.start();
+
+         //ack last message, making server side ack happening.
+         lastRm.acknowledge();
+
+         //receive possible canceled messages
+         TextMessage prm = null;
+         conn2 = (JBossConnection)cf.createConnection();
+         conn2.start();
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         prm = (TextMessage)cons2.receive(2000);
+         while (prm != null)
+         {
+            prm = (TextMessage)cons2.receive(2000);
+         }
+         
+         //check message count
+         //tearDown will do the check.
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }      
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   public static class ServerClientFailureCommand implements Command
+   {
+
+      private static final long serialVersionUID = 2603154447586447658L;
+
+      public Object execute(Server server) throws Exception
+      {
+         ServerPeer peer = server.getServerPeer();
+
+         SimpleConnectionManager cm = (SimpleConnectionManager)peer.getConnectionManager();
+
+         Map jmsClients = cm.getClients();
+         assertEquals(1, jmsClients.size());
+         Map endpoints = (Map)jmsClients.values().iterator().next();
+         assertEquals(1, endpoints.size());
+         Map.Entry entry = (Map.Entry)endpoints.entrySet().iterator().next();
+         String sessId = (String)entry.getKey();
+
+         // triggering server side clean up
+         cm.handleClientFailure(sessId);
+         return null;
+      }
+      
+   }
+}

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2009-05-29 01:55:48 UTC (rev 7121)
@@ -3247,5 +3247,215 @@
          }    
       }
    }
+
+   /*
+    * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+    */
+   public void testReceivingMessageOfPreparedTransaction() throws Exception
+   {
+      log.trace("starting testReceivingMessageOfPreparedTransaction");
+
+      Connection conn1 = null;
+
+      XAConnection conn2 = null;
+
+      XAConnection conn3 = null;
+
+      XAConnection conn4 = null;
+
+      Connection conn5 = null;
+
+      try
+      {
+         //First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess1.createProducer(queue4);
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         TextMessage tm1 = sess1.createTextMessage("TxM1");
+
+         prod.send(tm1);
+
+         conn2 = cf.createXAConnection();
+
+         XASession sess2 = conn2.createXASession();
+
+         XAResource res1 = sess2.getXAResource();
+
+         //Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("tx1".getBytes(), 42, "abcdef".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = sess2.createConsumer(queue4);
+
+         conn2.start();
+
+         //Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         //prepare the tx
+
+         res1.prepare(xid1);
+
+         conn1.close();
+
+         conn2.close();
+
+         conn1 = null;
+
+         conn2 = null;
+
+         //Now receive again
+
+         conn3 = cf.createXAConnection();
+
+         XASession sess3 = conn3.createXASession();
+
+         XAResource res3 = sess3.getXAResource();
+         
+         Xid xid2 = new MessagingXid("tx2".getBytes(), 42, "ghijkl".getBytes());
+
+         res3.start(xid2, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons3 = sess3.createConsumer(queue4);
+
+         conn3.start();
+
+         //Consume the message
+
+         TextMessage rm3 = (TextMessage)cons3.receive(3000);
+
+         assertNull(rm3);
+
+         res3.end(xid2, XAResource.TMSUCCESS);
+         
+         res3.prepare(xid2);
+         res3.commit(xid2, false);
+
+         conn3.close();
+         conn3 = null;
+         
+         //now recover the lost message
+         ServerManagement.stopServerPeer();
+
+         ServerManagement.startServerPeer();
+
+         deployAndLookupAdministeredObjects();
+
+         //Now recover
+
+         conn4 = cf.createXAConnection();
+
+         XASession sess4 = conn4.createXASession();
+
+         XAResource res4 = sess4.getXAResource();
+
+         Xid[] xids = res4.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = res4.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+
+         assertEquals(xid1, xids[0]);
+
+         //commit the tx
+
+         res4.commit(xids[0], false);
+
+         //The message should never be received again
+
+         conn4.close();
+         conn4 = null;
+
+         conn5 = cf.createConnection();
+
+         Session sess5 = conn5.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons5 = sess5.createConsumer(queue4);
+
+         conn5.start();
+
+         Message m = cons5.receive(1000);
+
+         assertNull(m);
+         
+         conn5.close();
+         conn5 = null;
+         
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn2 != null)
+         {
+            try
+            {
+               conn2.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn3 != null)
+         {
+            try
+            {
+               conn3.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn4 != null)
+         {
+            try
+            {
+               conn4.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn5 != null)
+         {
+            try
+            {
+               conn5.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+      }
+   }
+
 }
 




More information about the jboss-cvs-commits mailing list