[jboss-cvs] JBoss Messaging SVN: r7121 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456: src/etc/remoting and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 28 21:55:49 EDT 2009
Author: jbertram at redhat.com
Date: 2009-05-28 21:55:48 -0400 (Thu, 28 May 2009)
New Revision: 7121
Added:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
Log:
[JBPAPP-2030]
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/docs/userguide/en/modules/configuration.xml 2009-05-29 01:55:48 UTC (rev 7121)
@@ -1748,6 +1748,8 @@
<attribute name="SlowConsumers">false</attribute>
<attribute name="StrictTck">true</attribute>
+
+ <attribute name="SendAcksAsync">false</attribute>
<attribute name="DefaultTempQueueFullSize">50000</attribute>
@@ -1891,42 +1893,174 @@
bisocket transport to communicate.</para>
</section>
</section>
- <!-- End conf.connectionfactory.attributes -->
- </section>
- <!-- End conf.connectionfactory -->
- <section id="conf.connector">
- <title>Configuring the remoting connector</title>
- <para>JBoss Messaging uses JBoss Remoting for all client to server
- communication. For full details of what JBoss Remoting is capable of and
- how it is configured please consult the JBoss Remoting
- documentation.</para>
- <para>The default configuration includes a single remoting connector
- which is used by the single default connection factory. Each connection
- factory can be configured to use its own connector.</para>
- <para>The default connector is configured to use the remoting bisocket
- transport. The bisocket transport is a TCP socket based transport which
- only listens and accepts connections on the server side. I.e.
- connections are always initiated from the client side. This means it
- works well in typical firewall scenarios where only inbound connections
- are allowed on the server. Or where onlu outbound connections are
- allowed from the client.</para>
- <para>The bisocket transport can be configured to use SSL where a higher
- level of security is required.</para>
- <para>The other supported transport is the HTTP transport. This uses the
- HTTP protocol to communicate between client and server. Data is received
- on the client by the client periodically polling the server for
- messages. This transport is well suited to situations where there is a
- firewall between client and server which only allows incoming HTTP
- traffic on the server. Please note this transport will not be as
- performant as the bisocket transport due to the nature of polling and
- the HTTP protocl. Also please note it is not designed for high load
- situations.</para>
- <para>No other remoting transports are currently supported by JBoss
- Messaging</para>
- <para>You can look at remoting configuration under:</para>
- <para><JBoss>/server/<YourMessagingServer>/deploy/jboss-messaging.sar/remoting-bisocket-service.xml</para>
- <para>Here is an example bisocket remoting configuration:
- <programlisting>
+ <section id="conf.connectionfactory.attributes.tckstrictbehavior">
+ <title>StrictTck</title>
+
+ <para>Set this to true if you want strict JMS behaviour as required by
+ the TCK.</para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.sendacksasync">
+ <title>SendAcksAsync</title>
+
+ <para>Set this to true if you want acknowledgements to be sent asynchronously. This can speed up performance especially if you are using auto_acknowledge mode</para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.tempqueuepaging">
+ <title>Temporary queue paging parameters</title>
+
+ <para>DefaultTempQueueFullSize, DefaultTempQueuePageSize,
+ DefaultTempQueueDownCacheSize are optional attributes that determine
+ the default paging parameters to be used for any temporary
+ destinations scoped to connections created using this connection
+ factory. See the section on paging channels for more information on
+ what these values mean. They will default to values of 200000, 2000
+ and 2000 respectively if ommitted.</para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.dupsokbatchsize">
+ <title>DupsOKBatchSize</title>
+
+ <para>When using a session with acknowledge mode of
+ DUPS_OK_ACKNOWLEDGE this setting determines how many acknowledgments
+ it will buffer locally before sending. The default value is
+ <literal>2000</literal></para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.supportsloadbalancing">
+ <title>SupportsLoadBalancing</title>
+
+ <para>When using a connection factory with a clustered JBoss Messaging
+ installation you can choose whether to enable client side connection
+ load-balancing. This is determined by setting the attribute
+ supportsLoadBalancing on the connection factory.</para>
+
+ <para>If load balancing is enabled on a connection factory then any
+ connections created with that connection factory will be load-balanced
+ across the nodes of the cluster. Once a connection is created on a
+ particular node, it stays on that node.</para>
+
+ <para>The exact policy that determines how connections are load
+ balanced is determined by the LoadBalancingFactory attribute</para>
+
+ <para>The default value is <literal>false</literal></para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.supportsfailover">
+ <title>SupportsFailover</title>
+
+ <para>When using a connection factory with a clustered JBoss Messaging
+ installation you can choose whether to enable client side automatic
+ failover. This is determined by setting the attribute supportsFailover
+ on the connection factory.</para>
+
+ <para>If automatic failover is enabled on a connection factory, then
+ if a connection problem is detected with the connection then JBoss
+ Messaging will automatically and transparently failover to another
+ node in the cluster.</para>
+
+ <para>The failover is transparent meaning the user can carry on using
+ the sessions, consumers, producers and connection objects as
+ before.</para>
+
+ <para>If automatic failover is not required, then this attribute can
+ be set to false. With automatic failover disabled it is up to the user
+ code to catch connection exceptions in synchronous JMS operations and
+ install a JMS ExceptionListener to catch exceptions asynchronously.
+ When a connection is caught, the client side code should lookup a new
+ connection factory using HAJNDI and recreate the connection using
+ that.</para>
+
+ <para>The default value is <literal>false</literal></para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.disableremotingchecks">
+ <title>DisableRemotingChecks</title>
+
+ <para>By default, when deploying a connection factory, JBoss Messaging
+ checks that the corresponding JBoss Remoting Connector has "sensible"
+ values. JBoss Messaging is very sensitive to the values and for many
+ of them there's rarely a good reason to change them. To disable such
+ sanity checking set this to false. <warning>
+ There is rarely a good reason to disable checking. Only do so if you are absolutely sure in what you are doing
+ </warning></para>
+
+ <para>The default value is <literal>false</literal></para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.loadbalancingfactory">
+ <title>LoadBalancingFactory</title>
+
+ <para>If you are using a connection factory with client side load
+ balancing then you can specify how the load balancing is implemented
+ by overriding this attribute. The value must correspond to the name of
+ a class which implements the interface
+ org.jboss.jms.client.plugin.LoadBalancingFactory</para>
+
+ <para>The default value is
+ org.jboss.jms.client.plugin.RoundRobinLoadBalancingFactory, which load
+ balances connetions across the cluster in a round-robin fashion</para>
+ </section>
+
+ <section id="conf.connectionfactory.attributes.connector">
+ <title>Connector</title>
+
+ <para>This specifies which remoting connector this connection factory
+ uses. Different connection factories can use different
+ connectors.</para>
+
+ <para>For instance you could deploy one connection factory that
+ creates connections that use the HTTP transport to communicate to the
+ server and another that creates connections that use the bisocket
+ transport to communicate.</para>
+ </section>
+ </section>
+
+ <!-- End conf.connectionfactory.attributes -->
+ </section>
+
+ <!-- End conf.connectionfactory -->
+
+ <section id="conf.connector">
+ <title>Configuring the remoting connector</title>
+
+ <para>JBoss Messaging uses JBoss Remoting for all client to server
+ communication. For full details of what JBoss Remoting is capable of and
+ how it is configured please consult the JBoss Remoting
+ documentation.</para>
+
+ <para>The default configuration includes a single remoting connector which
+ is used by the single default connection factory. Each connection factory
+ can be configured to use its own connector.</para>
+
+ <para>The default connector is configured to use the remoting bisocket
+ transport. The bisocket transport is a TCP socket based transport which
+ only listens and accepts connections on the server side. I.e. connections
+ are always initiated from the client side. This means it works well in
+ typical firewall scenarios where only inbound connections are allowed on
+ the server. Or where onlu outbound connections are allowed from the
+ client.</para>
+
+ <para>The bisocket transport can be configured to use SSL where a higher
+ level of security is required.</para>
+
+ <para>The other supported transport is the HTTP transport. This uses the
+ HTTP protocol to communicate between client and server. Data is received
+ on the client by the client periodically polling the server for messages.
+ This transport is well suited to situations where there is a firewall
+ between client and server which only allows incoming HTTP traffic on the
+ server. Please note this transport will not be as performant as the
+ bisocket transport due to the nature of polling and the HTTP protocl. Also
+ please note it is not designed for high load situations.</para>
+
+ <para>No other remoting transports are currently supported by JBoss
+ Messaging</para>
+
+ <para>You can look at remoting configuration under:</para>
+
+ <para><JBoss>/server/<YourMessagingServer>/deploy/jboss-messaging.sar/remoting-bisocket-service.xml</para>
+
+ <para>Here is an example bisocket remoting configuration: <programlisting>
<config>
<invoker transport="bisocket">
@@ -2016,4 +2150,4 @@
the service binding manager for JBoss Messaging</para>
</section>
<!-- End conf.callback -->
-</chapter>
\ No newline at end of file
+</chapter>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-bisocket-service.xml 2009-05-29 01:55:48 UTC (rev 7121)
@@ -38,7 +38,10 @@
<attribute name="stopLeaseOnFailure" isParam="true">true</attribute>
<!-- Periodicity of client pings. Server window by default is twice this figure -->
- <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+ <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+
+ <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+ <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
<attribute name="timeout" isParam="true">0</attribute>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/remoting/remoting-sslbisocket-service.xml 2009-05-29 01:55:48 UTC (rev 7121)
@@ -36,7 +36,10 @@
<attribute name="stopLeaseOnFailure" isParam="true">true</attribute>
<!-- Periodicity of client pings. Server window by default is twice this figure -->
- <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+ <attribute name="clientLeasePeriod" isParam="true">10000</attribute>
+
+ <attribute name="failureDisconnectTimeout" isParam="true">0</attribute>
+ <attribute name="useClientConnectionIdentity" isParam="true">true</attribute>
<attribute name="timeout" isParam="true">0</attribute>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/server/default/deploy/connection-factories-service.xml 2009-05-29 01:55:48 UTC (rev 7121)
@@ -120,6 +120,10 @@
<attribute name="StrictTck">true</attribute>
+ <!- - Should acknowledgements be sent asynchronously? - ->
+
+ <attribute name="SendAcksAsync">false</attribute>
+
<!- - Disable JBoss Remoting Connector sanity checks - There is rarely a good reason to set this to true - ->
<attribute name="DisableRemotingChecks">false</attribute>
@@ -146,4 +150,4 @@
-->
-</server>
\ No newline at end of file
+</server>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2009-05-29 01:55:48 UTC (rev 7121)
@@ -118,6 +118,12 @@
<type>boolean</type>
</attribute>
+ <attribute access="read-write" getMethod="isSendAcksAsync" setMethod="setSendAcksAsync">
+ <description>Should acknowledgements be sent asynchronously?</description>
+ <name>SendAcksAsync</name>
+ <type>boolean</type>
+ </attribute>
+
<attribute access="read-write" getMethod="isDisableRemotingChecks" setMethod="setDisableRemotingChecks">
<description>Disable remoting connector parameter sanity checks. There is rarely a good reason to set this to true</description>
<name>DisableRemotingChecks</name>
@@ -146,4 +152,4 @@
<name>destroy</name>
</operation>
-</mbean>
\ No newline at end of file
+</mbean>
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -31,7 +31,6 @@
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.BrowserState;
import org.jboss.jms.client.state.ConnectionState;
@@ -92,14 +91,6 @@
Version versionToUse = connectionDelegate.getVersionToUse();
JMSRemotingConnection remotingConnection = connectionDelegate.getRemotingConnection();
- // install the consolidated remoting connection listener; it will be de-installed on
- // connection closing by ConnectionAspect
-
- ConsolidatedRemotingConnectionListener listener =
- new ConsolidatedRemotingConnectionListener();
-
- remotingConnection.addConnectionListener(listener);
-
if (versionToUse == null)
{
throw new IllegalStateException("Connection version is null");
@@ -109,7 +100,8 @@
new ConnectionState(serverID, connectionDelegate,
remotingConnection, versionToUse);
- listener.setConnectionState(connectionState);
+ remotingConnection.getConnectionListener().setConnectionState(connectionState);
+ remotingConnection.getConnectionListener().start();
connectionDelegate.setState(connectionState);
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -95,7 +95,7 @@
if (trace) log.trace("Trying communication on server(" + server + ")=" + delegates[server].getServerLocatorURI());
try
{
- remoting = new JMSRemotingConnection(delegates[server].getServerLocatorURI(), true, delegates[server].getStrictTck());
+ remoting = new JMSRemotingConnection(delegates[server].getServerLocatorURI(), true, delegates[server].getStrictTck(), delegates[server].isSendAcksAsync());
remoting.start();
currentDelegate = delegates[server];
if (trace) log.trace("Adding callback");
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -30,6 +30,7 @@
import javax.jms.JMSException;
import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
@@ -77,6 +78,8 @@
private boolean strictTck;
+ private boolean sendAcksAsync;
+
// Static ---------------------------------------------------------------------------------------
/*
@@ -106,7 +109,8 @@
// Constructors ---------------------------------------------------------------------------------
public ClientConnectionFactoryDelegate(String uniqueName, String objectID, int serverID, String serverLocatorURI,
- Version serverVersion, boolean clientPing, boolean strictTck)
+ Version serverVersion, boolean clientPing, boolean strictTck,
+ boolean sendAcksAsync)
{
super(objectID);
@@ -116,6 +120,7 @@
this.serverVersion = serverVersion;
this.clientPing = clientPing;
this.strictTck = strictTck;
+ this.sendAcksAsync = sendAcksAsync;
}
public ClientConnectionFactoryDelegate()
@@ -149,7 +154,7 @@
try
{
- remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck);
+ remotingConnection = new JMSRemotingConnection(serverLocatorURI, clientPing, strictTck, new ConsolidatedRemotingConnectionListener(), sendAcksAsync);
remotingConnection.start();
@@ -271,8 +276,13 @@
return strictTck;
}
- public void synchronizeWith(DelegateSupport newDelegate) throws Exception
+ public boolean isSendAcksAsync()
{
+ return this.sendAcksAsync;
+ }
+
+ public void synchronizeWith(DelegateSupport newDelegate) throws Exception
+ {
super.synchronizeWith(newDelegate);
}
@@ -327,6 +337,8 @@
clientPing = in.readBoolean();
strictTck = in.readBoolean();
+
+ sendAcksAsync = in.readBoolean();
}
public void write(DataOutputStream out) throws Exception
@@ -342,6 +354,8 @@
out.writeBoolean(clientPing);
out.writeBoolean(strictTck);
+
+ out.writeBoolean(sendAcksAsync);
}
// Inner Classes --------------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -93,6 +93,8 @@
private int dupsOKBatchSize;
private boolean strictTck;
+
+ private boolean sendAcksAsync;
// Static ---------------------------------------------------------------------------------------
@@ -132,6 +134,8 @@
onewayClient = conn.getOnewayClient();
strictTck = conn.isStrictTck();
+
+ sendAcksAsync = conn.isSendAcksAsync();
}
public void setState(HierarchicalState state)
@@ -145,6 +149,8 @@
onewayClient = conn.getOnewayClient();
strictTck = conn.isStrictTck();
+
+ sendAcksAsync = conn.isSendAcksAsync();
}
// Closeable implementation ---------------------------------------------------------------------
@@ -169,14 +175,30 @@
{
RequestSupport req = new SessionAcknowledgeDeliveryRequest(id, version, ack);
- return ((Boolean)doInvoke(client, req)).booleanValue();
+ if (sendAcksAsync)
+ {
+ doInvokeOneway(onewayClient, req);
+
+ return true;
+ }
+ else
+ {
+ return ((Boolean)doInvoke(client, req)).booleanValue();
+ }
}
public void acknowledgeDeliveries(List acks) throws JMSException
{
RequestSupport req = new SessionAcknowledgeDeliveriesRequest(id, version, acks);
-
- doInvoke(client, req);
+
+ if (sendAcksAsync)
+ {
+ doInvokeOneway(onewayClient, req);
+ }
+ else
+ {
+ doInvoke(client, req);
+ }
}
/**
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -42,6 +42,8 @@
private ExceptionListener jmsExceptionListener;
private ConnectionFailureListener remotingListener;
+
+ private boolean started;
// Constructors ---------------------------------------------------------------------------------
@@ -53,6 +55,11 @@
public void handleConnectionException(Throwable throwable, Client client)
{
+ if (!started)
+ {
+ return;
+ }
+
// forward the exception to delegate listener and JMS ExceptionListeners; synchronize
// to avoid race conditions
@@ -162,6 +169,11 @@
}
return state + ".ConsolidatedListener";
}
+
+ public void start()
+ {
+ started = true;
+ }
// Package protected ----------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -244,6 +244,7 @@
private InvokerLocator serverLocator;
private CallbackManager callbackManager;
private boolean strictTck;
+ private boolean sendAcksAsync;
// When a failover is performed, this flag is set to true
protected boolean failed = false;
@@ -254,12 +255,20 @@
// Constructors ---------------------------------------------------------------------------------
- public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck) throws Exception
+ public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck,
+ boolean sendAcksAsync) throws Exception
{
+ this(serverLocatorURI, clientPing, strictTck, null, sendAcksAsync);
+ }
+
+ public JMSRemotingConnection(String serverLocatorURI, boolean clientPing, boolean strictTck, ConsolidatedRemotingConnectionListener listener, boolean sendAcksAsync) throws Exception
+ {
serverLocator = new InvokerLocator(serverLocatorURI);
this.clientPing = clientPing;
this.strictTck = strictTck;
-
+ this.sendAcksAsync = sendAcksAsync;
+ this.remotingConnectionListener = listener;
+
log.trace(this + " created");
}
@@ -316,7 +325,14 @@
{
public Object run() throws Exception
{
- client.connect();
+ if (remotingConnectionListener != null)
+ {
+ client.connect(remotingConnectionListener, serverLocator.getParameters());
+ }
+ else
+ {
+ client.connect();
+ }
onewayClient.connect();
return null;
}
@@ -345,7 +361,7 @@
public void stop()
{
log.trace(this + " stop");
-
+
// explicitly remove the callback listener, to avoid race conditions on server
// (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
@@ -408,6 +424,11 @@
{
return strictTck;
}
+
+ public boolean isSendAcksAsync()
+ {
+ return sendAcksAsync;
+ }
public synchronized boolean isFailed()
{
@@ -421,6 +442,11 @@
public synchronized void setFailed()
{
failed = true;
+
+ if (client == null)
+ {
+ return;
+ }
// Remoting has the bad habit of letting the job of cleaning after a failed connection up to
// the application. Here, we take care of that, by disconnecting the remoting client, and
@@ -429,7 +455,7 @@
try
{
- client.setDisconnectTimeout(0);
+ client.setDisconnectTimeout(0);
}
catch (Throwable ignore)
{
@@ -465,7 +491,7 @@
return true;
}
- public synchronized void addPlainConnectionListener(ConnectionListener listener)
+ public synchronized void addPlainConnectionListener(final ConnectionListener listener)
{
client.addConnectionListener(listener);
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -49,7 +49,8 @@
boolean supportsFailover,
boolean supportsLoadBalancing,
LoadBalancingFactory loadBalancingPolicy,
- boolean strictTck) throws Exception;
+ boolean strictTck,
+ boolean sendAcksAsync) throws Exception;
void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing) throws Exception;
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/ServerPeer.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -1178,7 +1178,10 @@
{
if (sessions.remove(id) == null)
{
- throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+ //here we don't throw exception as the session may have been removed already due to server side
+ //failure handler (SimpleConnectionManager), which deemed to be normal behavior in application environment.
+ if (log.isTraceEnabled()) { log.trace("Cannot find session with id " + id + " to remove"); }
+ // throw new IllegalStateException("Cannot find session with id " + id + " to remove");
}
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -78,6 +78,8 @@
private boolean strictTck;
+ private boolean sendAcksAsync;
+
private boolean disableRemotingChecks;
// Constructors ---------------------------------------------------------------------------------
@@ -202,7 +204,7 @@
locatorURI, enablePing, prefetchSize, slowConsumers,
defaultTempQueueFullSize, defaultTempQueuePageSize,
defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
- loadBalancingFactory, strictTck);
+ loadBalancingFactory, strictTck, sendAcksAsync);
String info = "Connector " + locator.getProtocol() + "://" +
locator.getHost() + ":" + locator.getPort();
@@ -429,6 +431,22 @@
this.strictTck = strictTck;
}
+ public boolean isSendAcksAsync()
+ {
+ return this.sendAcksAsync;
+ }
+
+ public void setSendAcksAsync(boolean async)
+ {
+ if (started)
+ {
+ log.warn("SendAcksAsync can only be changed when connection factory is stopped");
+ return;
+ }
+
+ this.sendAcksAsync = async;
+ }
+
public boolean isDisableRemotingChecks()
{
return disableRemotingChecks;
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -121,7 +121,8 @@
boolean supportsFailover,
boolean supportsLoadBalancing,
LoadBalancingFactory loadBalancingFactory,
- boolean strictTck)
+ boolean strictTck,
+ boolean sendAcksAsync)
throws Exception
{
log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
@@ -175,7 +176,8 @@
ClientConnectionFactoryDelegate localDelegate =
new ClientConnectionFactoryDelegate(uniqueName, id, serverPeer.getServerPeerID(),
- locatorURI, version, clientPing, useStrict);
+ locatorURI, version, clientPing, useStrict,
+ sendAcksAsync);
log.debug(this + " created local delegate " + localDelegate);
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -29,8 +29,6 @@
import java.util.Map;
import java.util.Set;
-import javax.jms.JMSException;
-
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
@@ -167,6 +165,7 @@
return e;
}
+
return null;
}
@@ -221,17 +220,20 @@
{
if (t instanceof ClientDisconnectedException)
{
- // This is OK
- if (trace) { log.trace(this + " notified that client " + client + " has disconnected"); }
- return;
+ if (log.isTraceEnabled())
+ {
+ log.trace("Connection is closed normally: " + client);
+ }
}
else
{
- if (trace) { log.trace(this + " detected failure on client " + client, t); }
+ if (log.isTraceEnabled())
+ {
+ log.trace("Connection is closed on failure event: " + client);
+ }
}
+ String remotingSessionID = client.getSessionId();
- String remotingSessionID = client.getSessionId();
-
if (remotingSessionID != null)
{
handleClientFailure(remotingSessionID);
@@ -401,7 +403,7 @@
try
{
- ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+ ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
}
catch (Throwable ignore)
{
@@ -421,10 +423,10 @@
{
String jmsClientID = remotingSessions.get(jmsSessionID);
- log.warn("A problem has been detected " +
- "with the connection to remote client " +
- jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
- "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+ log.trace("A problem has been detected " +
+ "with the connection to remote client " +
+ jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+ "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
if (jmsClientID != null)
{
@@ -467,7 +469,6 @@
catch (Throwable ignore)
{
}
-
return;
}
}
@@ -493,7 +494,7 @@
try
{
- ((ServerInvokerCallbackHandler)entry.getValue()).destroy();
+ ((ServerInvokerCallbackHandler)entry.getValue()).shutdown();
}
catch (Throwable ignore)
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -95,7 +95,7 @@
// Attributes -----------------------------------------------------------------------------------
private String id;
-
+
private volatile boolean closed;
private volatile boolean started;
@@ -366,6 +366,13 @@
{
try
{
+ //reason for synchronization
+ //Sometimes the server side detects a connection failure but
+ //client side is normal. So it's possible the client side is calling
+ //connection.close() while in the mean time the server side connection
+ //failure handler call it also.
+ synchronized (this)
+ {
if (trace) { log.trace(this + " close()"); }
if (closed)
@@ -437,11 +444,15 @@
temporaryDestinations.clear();
}
+ closed = true;
+ }
+
+ //we put this outside the sync loop to avoid dead lock where
+ //SimpleConnectionManager.handleClientFailure() holds itself and then tries to call this close(), which requires lock on this
+ //meanwhile this close() (called from client) holds itself and call unregisterConnection(), which requires lock on SimpleConnectionManager.
cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
Dispatcher.instance.unregisterTarget(id, this);
-
- closed = true;
}
catch (Throwable t)
{
@@ -649,7 +660,10 @@
{
if (sessions.remove(sessionId) == null)
{
- throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+ //Here not to throw exception, because it is possible that the session close can be
+ //called from server side (SimpleConnectionManager) before client side.
+ if (trace) { log.trace("Cannot find session with id " + sessionId + " to remove"); }
+ //throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
}
}
}
@@ -752,7 +766,7 @@
else if (dest.isQueue())
{
if (trace) { log.trace(this + " routing " + msg + " to queue"); }
- if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
+ if (!postOffice.route(ref, new JMSCondition(true, dest.getName()), tx))
{
throw new JMSException("Failed to route " + ref + " to " + dest.getName());
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -1119,16 +1119,22 @@
{
if (consumers.remove(consumerId) == null)
{
- throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+ if (trace) { log.trace("Cannot find consumer with id " + consumerId + " to remove"); }
+ //don't throw, as it maybe called twice from client and server's connection failure handler.
+ //throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
}
}
}
void localClose() throws Throwable
{
+
if (closed)
{
- throw new IllegalStateException("Session is already closed");
+ //don't throw the exception as it maybe called twice
+ if (trace) { log.trace("Session is already closed. "); }
+ return;
+ //throw new IllegalStateException("Session is already closed");
}
if (trace) log.trace(this + " close()");
@@ -1167,10 +1173,12 @@
//Note we don't maintain order using a LinkedHashMap since then we lose
//concurrency since we would have to lock it exclusively
- List entries = new ArrayList(deliveries.entrySet());
+ synchronized (deliveries)
+ {
+ List entries = new ArrayList(deliveries.entrySet());
- //Sort them in reverse delivery id order
- Collections.sort(entries,
+ //Sort them in reverse delivery id order
+ Collections.sort(entries,
new Comparator()
{
public int compare(Object obj1, Object obj2)
@@ -1183,39 +1191,46 @@
}
});
- Iterator iter = entries.iterator();
+ Iterator iter = entries.iterator();
- Set channels = new HashSet();
+ Set channels = new HashSet();
- if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
+ if (trace) { log.trace(this + " cancelling " + entries.size() + " deliveries"); }
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
- if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
+ if (trace) { log.trace(this + " cancelling delivery with delivery id: " + entry.getKey()); }
- DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+ DeliveryRecord rec = (DeliveryRecord)entry.getValue();
- rec.del.cancel();
+ /*
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+ */
+ if (!rec.del.isXAPrepared())
+ {
+ rec.del.cancel();
+ }
- channels.add(rec.del.getObserver());
- }
+ channels.add(rec.del.getObserver());
+ }
- promptDelivery(channels);
+ promptDelivery(channels);
- //Close down the executor
+ //Close down the executor
- //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
- //prompter is queued and starts to execute
- //prompter almost finishes executing then a message is cancelled due to this session closing
- //this causes another prompter to be queued
- //shutdownAfterProcessingCurrentTask is then called
- //this means the second prompter never runs and the cancelled message doesn't get redelivered
- executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+ //Note we need to wait for ALL tasks to complete NOT just one otherwise we can end up with the following situation
+ //prompter is queued and starts to execute
+ //prompter almost finishes executing then a message is cancelled due to this session closing
+ //this causes another prompter to be queued
+ //shutdownAfterProcessingCurrentTask is then called
+ //this means the second prompter never runs and the cancelled message doesn't get redelivered
+ executor.shutdownAfterProcessingCurrentlyQueuedTasks();
- deliveries.clear();
-
+ deliveries.clear();
+ }
+
sp.removeSession(id);
Dispatcher.instance.unregisterTarget(id, this);
@@ -1225,7 +1240,11 @@
void cancelDelivery(long deliveryId) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ DeliveryRecord rec = null;
+ synchronized(deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(deliveryId));
+ }
if (rec == null)
{
@@ -1436,7 +1455,7 @@
{
// one way invocation, no acknowledgment sent back by the client
if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-
+
callbackHandler.handleCallbackOneway(callback);
//We store the delivery id so we know to wait for any deliveries in transit on close
@@ -1569,7 +1588,11 @@
private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
{
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ DeliveryRecord rec = null;
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ }
if (rec == null)
{
@@ -1717,7 +1740,19 @@
{
if (trace) { log.trace(this + " acknowledging delivery " + ack); }
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ DeliveryRecord rec = null;
+
+ //I put synchronized here to prevent the following from happening:
+ //a clustered server node detects connection failure and cancel deliveries.
+ //but the consumer on it get through to here
+ //if not synchronized, the remove may get the record before the above cancel action clear up the deliveries map.
+ //so the cancel action makes the message back to queue and this method cause the delivery count to decrement.
+ //as the cancel will decrease the delivery count once, so this will result the delivery count being decremented twice
+ //for one same message.
+ synchronized (deliveries)
+ {
+ rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ }
if (rec == null)
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/contract/Delivery.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -48,4 +48,9 @@
void cancel() throws Throwable;
boolean isRecovered();
+
+ /**
+ * Mark if this delivery is with a prepared XA transaction.
+ */
+ boolean isXAPrepared();
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -283,6 +283,13 @@
acknowledgeInternal(d, null, false);
}
+ /*
+ * Note: If a XA tx is not committed while failure happens, cancelling of the
+ * delivery shouldn't put the message with transactions back to re-deliver.
+ * It must be there in DB until the transaction recovery happens.
+ *
+ * @see org.jboss.messaging.core.contract.DeliveryObserver#cancel(org.jboss.messaging.core.contract.Delivery)
+ */
public void cancel(Delivery del) throws Throwable
{
//We may need to update the delivery count in the database
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -50,6 +50,7 @@
private DeliveryObserver observer;
private MessageReference reference;
private boolean recovered;
+ private Transaction tx;
private boolean trace = log.isTraceEnabled();
@@ -73,6 +74,7 @@
this.observer = observer;
this.selectorAccepted = selectorAccepted;
this.recovered = recovered;
+ this.tx = null;
}
// Delivery implementation ----------------------------------------------------------------------
@@ -95,6 +97,8 @@
public void acknowledge(Transaction tx) throws Throwable
{
if (trace) { log.trace(this + " acknowledging delivery " + ( tx == null ? "non-transactionally" : "in " + tx)); }
+
+ this.tx = tx;
observer.acknowledge(this, tx);
}
@@ -118,6 +122,20 @@
return "Delivery" + (reference == null ? "" : "[" + reference + "]");
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.contract.Delivery#isXA()
+ */
+ public boolean isXAPrepared()
+ {
+ if (tx != null) {
+ if (tx.getXid() != null)
+ {
+ return tx.getState() == Transaction.STATE_PREPARED;
+ }
+ }
+ return false;
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -32,12 +32,12 @@
* A NamedThreadQueuedExecutor
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @deprecated
*
*/
public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
- private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-
+{
private final String name;
private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
@@ -49,10 +49,6 @@
this.name = name;
setThreadFactory(new Factory());
-
- clearThread();
-
- restart();
}
private class Factory implements ThreadFactory
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (from rev 6192, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/DeliveryOnConnectionFailureTest.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -0,0 +1,291 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
+import org.jboss.remoting.Client;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.Command;
+import org.jboss.test.messaging.tools.container.Server;
+
+/**
+ * A DeliveryOnConnectionFailureTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Mar 26, 2009 3:14:28 PM
+ *
+ */
+public class DeliveryOnConnectionFailureTest extends JMSTestCase
+{
+
+ public DeliveryOnConnectionFailureTest(String name)
+ {
+ super(name);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+ //Message Stuck means messages are kept in delivering state and never be delivered again
+ //unless the server is restarted (for persistent messages).
+ //this can happen in the following conditions:
+ //1. The client ping timeout and JBM tries to disconnect from the server (this happens in cluster).
+ //2. Due to the network/remoting issue, the server will receive a 'normal' disconnection event
+ //3. The server assumes the client has already closed it's connection and therefore doesn't clean up
+ //4. So the connection at the server is left open, including the sessions created on that connection.
+ //5. If the sessions contains messages in delivering, those messages will appear stuck.
+ //To fix this, either the server side always tries to clean up the connection whenever a disconnection happens
+ //or the remoting layer handle this correctly.
+ //
+ //Here we simplify the situation. First start the server and get a connection to it. Then
+ //we send a message to the server with client ack. We receive it without ack,
+ //next we directly call the client.disconnect() from client without closing the connection
+ //the server should cancel the message. Then we receive the message and ack it.
+ public void testMessageStuckOnConnectionFailure() throws Exception
+ {
+ ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ JBossConnection conn1 = null;
+ JBossConnection conn2 = null;
+
+ try
+ {
+ //create a connection
+ conn1 = (JBossConnection)cf.createConnection();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer prod1 = sess1.createProducer(queue1);
+ TextMessage msg = sess1.createTextMessage("dont-stuck-me!");
+ conn1.start();
+
+ //send a message
+ prod1.send(msg);
+
+ //receive the message but not ack
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage rm = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(rm);
+ assertEquals("dont-stuck-me!", rm.getText());
+
+ //break connection.
+ JMSRemotingConnection jmsConn = ((ClientConnectionDelegate)conn1.getDelegate()).getRemotingConnection();
+ Client rmClient = jmsConn.getRemotingClient();
+ rmClient.disconnect();
+
+ //wait for server side cleanup
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore.
+ }
+
+ //now receive the message
+ conn2 = (JBossConnection)cf.createConnection();
+ conn2.start();
+ Session sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ TextMessage rm2 = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(rm2);
+ assertEquals("dont-stuck-me!", rm2.getText());
+ rm2.acknowledge();
+
+ //Message count should be zero.
+ //this is checked in tearDown().
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+
+ }
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-1456
+ //another issue with jira 1456 is negative message count.
+ //This test guarantees the message count won't go negative
+ //Error Scenario:
+ // 1. Server side detects the connection failure (lease timeout) and close the connection/session
+ // 2. The session endpoint will cancel the messages being delivered to the queue.
+ // 3. At the same time the client side received some of the messages and acknowledge them
+ // 4. The acknowledge action will decrease the delivering count of the queue, and the session endpoint
+ // cancel also decrease the delivering count.
+ // 5. If not synchronized, one message may be canceled and acked at the same time, so the delivering count
+ // will be decreased twice for each message, resulting in negative message count.
+ //
+ //The test first creates a connection and send messages, then it receives the messages, then ack the last
+ //msg (client-ack), at the same time, simulate the server side connection failure to trigger server side
+ //clean up. This will create a possibility that when not properly synchronized, the above
+ //described issue may happen. At the end check the message count, it should always be zero.
+ public void testMessageCountOnConnectionFailure() throws Exception
+ {
+ ConnectionFactory cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ JBossConnection conn1 = null;
+ JBossConnection conn2 = null;
+
+ try
+ {
+ conn1 = (JBossConnection)cf.createConnection();
+ conn1.start();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ //now send messages
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ final int NUM_MSG = 2000;
+ for (int i = 0; i < NUM_MSG; ++i)
+ {
+ TextMessage tm = sess1.createTextMessage("-m"+i);
+ prod1.send(tm);
+ }
+
+ //receive the messages
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ for (int j = 0; j < NUM_MSG-1; ++j)
+ {
+ TextMessage rm = (TextMessage)cons1.receive(2000);
+ assertNotNull(rm);
+ assertEquals("-m"+j, rm.getText());
+ }
+
+ //last message
+ TextMessage lastRm = (TextMessage)cons1.receive(2000);
+ assertNotNull(lastRm);
+ assertEquals("-m"+(NUM_MSG-1), lastRm.getText());
+
+ final ServerClientFailureCommand cmd = new ServerClientFailureCommand();
+
+ Thread exeThr = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ServerManagement.getServer().executeCommand(cmd);
+ }
+ catch (Exception e)
+ {
+ log.error("failed to invoke command", e);
+ fail("failure in executing command.");
+ }
+ }
+ };
+
+ exeThr.start();
+
+ //ack last message, making server side ack happening.
+ lastRm.acknowledge();
+
+ //receive possible canceled messages
+ TextMessage prm = null;
+ conn2 = (JBossConnection)cf.createConnection();
+ conn2.start();
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons2 = sess2.createConsumer(queue1);
+ prm = (TextMessage)cons2.receive(2000);
+ while (prm != null)
+ {
+ prm = (TextMessage)cons2.receive(2000);
+ }
+
+ //check message count
+ //tearDown will do the check.
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ public static class ServerClientFailureCommand implements Command
+ {
+
+ private static final long serialVersionUID = 2603154447586447658L;
+
+ public Object execute(Server server) throws Exception
+ {
+ ServerPeer peer = server.getServerPeer();
+
+ SimpleConnectionManager cm = (SimpleConnectionManager)peer.getConnectionManager();
+
+ Map jmsClients = cm.getClients();
+ assertEquals(1, jmsClients.size());
+ Map endpoints = (Map)jmsClients.values().iterator().next();
+ assertEquals(1, endpoints.size());
+ Map.Entry entry = (Map.Entry)endpoints.entrySet().iterator().next();
+ String sessId = (String)entry.getKey();
+
+ // triggering server side clean up
+ cm.handleClientFailure(sessId);
+ return null;
+ }
+
+ }
+}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2009-05-28 23:53:30 UTC (rev 7120)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1397_JBMESSAGING_1440_JBMESSAGING_1456/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2009-05-29 01:55:48 UTC (rev 7121)
@@ -3247,5 +3247,215 @@
}
}
}
+
+ /*
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+ */
+ public void testReceivingMessageOfPreparedTransaction() throws Exception
+ {
+ log.trace("starting testReceivingMessageOfPreparedTransaction");
+
+ Connection conn1 = null;
+
+ XAConnection conn2 = null;
+
+ XAConnection conn3 = null;
+
+ XAConnection conn4 = null;
+
+ Connection conn5 = null;
+
+ try
+ {
+ //First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess1.createProducer(queue4);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage tm1 = sess1.createTextMessage("TxM1");
+
+ prod.send(tm1);
+
+ conn2 = cf.createXAConnection();
+
+ XASession sess2 = conn2.createXASession();
+
+ XAResource res1 = sess2.getXAResource();
+
+ //Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("tx1".getBytes(), 42, "abcdef".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = sess2.createConsumer(queue4);
+
+ conn2.start();
+
+ //Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ //prepare the tx
+
+ res1.prepare(xid1);
+
+ conn1.close();
+
+ conn2.close();
+
+ conn1 = null;
+
+ conn2 = null;
+
+ //Now receive again
+
+ conn3 = cf.createXAConnection();
+
+ XASession sess3 = conn3.createXASession();
+
+ XAResource res3 = sess3.getXAResource();
+
+ Xid xid2 = new MessagingXid("tx2".getBytes(), 42, "ghijkl".getBytes());
+
+ res3.start(xid2, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue4);
+
+ conn3.start();
+
+ //Consume the message
+
+ TextMessage rm3 = (TextMessage)cons3.receive(3000);
+
+ assertNull(rm3);
+
+ res3.end(xid2, XAResource.TMSUCCESS);
+
+ res3.prepare(xid2);
+ res3.commit(xid2, false);
+
+ conn3.close();
+ conn3 = null;
+
+ //now recover the lost message
+ ServerManagement.stopServerPeer();
+
+ ServerManagement.startServerPeer();
+
+ deployAndLookupAdministeredObjects();
+
+ //Now recover
+
+ conn4 = cf.createXAConnection();
+
+ XASession sess4 = conn4.createXASession();
+
+ XAResource res4 = sess4.getXAResource();
+
+ Xid[] xids = res4.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = res4.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ assertEquals(xid1, xids[0]);
+
+ //commit the tx
+
+ res4.commit(xids[0], false);
+
+ //The message should never be received again
+
+ conn4.close();
+ conn4 = null;
+
+ conn5 = cf.createConnection();
+
+ Session sess5 = conn5.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons5 = sess5.createConsumer(queue4);
+
+ conn5.start();
+
+ Message m = cons5.receive(1000);
+
+ assertNull(m);
+
+ conn5.close();
+ conn5 = null;
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn2 != null)
+ {
+ try
+ {
+ conn2.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn3 != null)
+ {
+ try
+ {
+ conn3.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn4 != null)
+ {
+ try
+ {
+ conn4.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn5 != null)
+ {
+ try
+ {
+ conn5.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+ }
+ }
+
}
More information about the jboss-cvs-commits
mailing list