Author: clebert.suconic(a)jboss.com
Date: 2012-01-02 08:49:15 -0500 (Mon, 02 Jan 2012)
New Revision: 11943
Added:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java
Modified:
branches/Branch_2_2_AS7/build-hornetq.xml
branches/Branch_2_2_AS7/build-maven.xml
branches/Branch_2_2_AS7/docs/user-manual/en/clusters.xml
branches/Branch_2_2_AS7/docs/user-manual/en/configuration-index.xml
branches/Branch_2_2_AS7/docs/user-manual/en/paging.xml
branches/Branch_2_2_AS7/src/config/common/schema/hornetq-configuration.xsd
branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/hornetq-configuration.xml
branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/ra.xml
branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/ra.xml
branches/Branch_2_2_AS7/src/config/jboss-as-5/clustered/hornetq-configuration.xml
branches/Branch_2_2_AS7/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
branches/Branch_2_2_AS7/src/config/jboss-as-6/clustered/hornetq-configuration.xml
branches/Branch_2_2_AS7/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
branches/Branch_2_2_AS7/src/config/ra.xml
branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/TopicControl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/Configuration.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/SequentialFileFactory.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PrintPages.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAProperties.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java
branches/Branch_2_2_AS7/tests/config/ConfigurationTest-full-config.xml
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/InterceptorTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/jms/misc/ManifestTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/ServiceTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Merge on changes from EAP
Modified: branches/Branch_2_2_AS7/build-hornetq.xml
===================================================================
--- branches/Branch_2_2_AS7/build-hornetq.xml 2011-12-23 16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/build-hornetq.xml 2012-01-02 13:49:15 UTC (rev 11943)
@@ -45,7 +45,7 @@
<property file="src/config/common/hornetq-version.properties"/>
<property name="hornetq.version.revision" value="0" />
<property name="twitter.consumerKey" value="null"/>
- <property name="hornetq.version.svnurl"
value="https://svn.jboss.org/repos/hornetq/branches/Branch_2_2_EAP&q...
+ <property name="hornetq.version.svnurl"
value="https://svn.jboss.org/repos/hornetq/branches/Branch_2_2_AS7&q...
<property name="hornetq.version.string"
value="${hornetq.version.majorVersion}.${hornetq.version.minorVersion}.${hornetq.version.microVersion}.${hornetq.version.versionSuffix}
(${hornetq.version.versionName}, ${hornetq.version.incrementingVersion})"/>
<property name="module.version"
@@ -90,7 +90,6 @@
<property name="resources.jar.name"
value="hornetq-resources.jar"/>
<property name="resources.sources.jar.name"
value="hornetq-resources-sources.jar"/>
<property name="twitter4j.jar.name"
value="twitter4j-core.jar"/>
- <property name="eap.examples.zip.name"
value="hornetq-eap-examples.zip"/>
<!--source and build dirs-->
<property name="build.dir" value="build"/>
@@ -794,7 +793,7 @@
</target>
<target name="jar"
- depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms,
jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service,
jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar,
jar-twitter-integration, jar-spring-integration, jar-rest, eap-examples">
+ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms,
jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service,
jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar,
jar-twitter-integration, jar-spring-integration, jar-rest">
</target>
<target name="jar-jnp-client" depends="init">
@@ -1410,29 +1409,6 @@
destfile="${build.dir}/${build.artifact}.tar.gz"/>
</target>
- <target name="eap-examples" description="Generates a file with
examples tuned for the JBoss EAP" depends="init">
- <mkdir dir="${build.dir}/eap-examples-tmp"/>
-
-
- <copy todir="${build.dir}/eap-examples-tmp">
- <fileset dir="${examples.dir}"
excludes="**/build.sh,**/build.bat, **/twitter-connector/**"/>
- </copy>
-
- <copy todir="${build.dir}/eap-examples-tmp"
overwrite="true">
- <fileset dir="examples-eap"/>
- </copy>
-
- <replace dir="${build.dir}/eap-examples-tmp"
token="hornetq-ra.rar" value="jms-ra.rar"></replace>
-
- <zip destfile="${build.jars.dir}/${eap.examples.zip.name}">
- <zipfileset dir="${build.dir}/eap-examples-tmp"
prefix="examples/hornetq"/>
- </zip>
-
- <delete dir="${build.dir}/eap-examples-tmp"/>
- </target>
-
-
-
<target name="source-distro">
<mkdir dir="${build.dir}"/>
<zip destfile="${build.dir}/${build.artifact}-src.zip">
Modified: branches/Branch_2_2_AS7/build-maven.xml
===================================================================
--- branches/Branch_2_2_AS7/build-maven.xml 2011-12-23 16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/build-maven.xml 2012-01-02 13:49:15 UTC (rev 11943)
@@ -17,6 +17,10 @@
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
+ <condition property="maven.executable" value="mvn.bat"
else="mvn">
+ <os family="windows"/>
+ </condition>
+
<target name="uploadHornetQBootstrap">
<antcall target="upload">
<param name="artifact.id" value="hornetq-bootstrap"/>
@@ -162,10 +166,10 @@
<target name="upload-local-target">
<!-- install the jar -->
- <exec executable="mvn" dir="./build">
+ <exec executable="${maven.executable}" dir="./build">
<arg value="install:install-file"/>
<!-- uncomment the following line to deploy to the JBoss 5 repository -->
- <!-- arg value="-Dmaven.repo.local=/work/eap/maven-repository"/
-->
+ <!-- <arg
value="-Dmaven.repo.local=/work/eap-51/maven-repository"/> -->
<arg value="-DgroupId=org.hornetq"/>
<arg value="-DartifactId=${artifact.id}"/>
<arg value="-Dversion=${hornetq.version}"/>
@@ -173,7 +177,7 @@
<arg value="-Dfile=./jars/${file-name}.jar"/>
</exec>
<!-- install the sources jar -->
- <exec executable="mvn" dir="./build">
+ <exec executable="${maven.executable}" dir="./build">
<arg value="install:install-file"/>
<arg value="-DgroupId=org.hornetq"/>
<!-- uncomment the following line to deploy to the JBoss 5 repository -->
@@ -188,7 +192,7 @@
<target name="upload">
<!-- upload the jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="deploy:deploy-file"/>
<arg value="-e"/>
<arg value="-DgroupId=org.hornetq"/>
@@ -201,7 +205,7 @@
<arg
value="-Durl=dav:https://snapshots.jboss.org/maven2"/>
</exec>
<!-- upload the corresponding sources jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="deploy:deploy-file"/>
<arg value="-e"/>
<arg value="-DgroupId=org.hornetq"/>
@@ -310,7 +314,7 @@
</project>"/>
<!-- deploy the jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="-e"/>
<arg value="deploy:deploy-file"/>
<arg value="-DpomFile=${temporary.pom}"/>
@@ -323,7 +327,7 @@
<arg
value="-Durl=https://repository.jboss.org/nexus/service/local/stagin...
</exec>
<!-- deploy the sources jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="-e"/>
<arg value="deploy:deploy-file"/>
<arg value="-DpomFile=${temporary.pom}"/>
Modified: branches/Branch_2_2_AS7/docs/user-manual/en/clusters.xml
===================================================================
--- branches/Branch_2_2_AS7/docs/user-manual/en/clusters.xml 2011-12-23 16:54:27 UTC (rev
11942)
+++ branches/Branch_2_2_AS7/docs/user-manual/en/clusters.xml 2012-01-02 13:49:15 UTC (rev
11943)
@@ -521,6 +521,16 @@
<para>This parameter is optional and its default is at -1
(infinite retries).</para>
</listitem>
<listitem>
+ <para><literal>min-large-message-size</literal>.
This parameters determines when a
+ message should be splitted with multiple packages when
sent over the cluster.</para>
+ <para>This parameter is optional and its default is at
100K.</para>
+ </listitem>
+ <listitem>
+
<para><literal>reconnect-attempts"</literal>.The number of times
the system will
+ try to connect a node on the cluster. If the max-retry is
achieved this node will be considered permanently down and the system will stop routing
messages to this node.</para>
+ <para>This parameter is optional and its default is at -1
(infinite retries).</para>
+ </listitem>
+ <listitem>
<para><literal>max-hops</literal>. When a cluster
connection decides the set of
nodes to which it might load balance a message, those nodes do
not have to
be directly connected to it via a cluster connection. HornetQ can
be
Modified: branches/Branch_2_2_AS7/docs/user-manual/en/configuration-index.xml
===================================================================
--- branches/Branch_2_2_AS7/docs/user-manual/en/configuration-index.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/docs/user-manual/en/configuration-index.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -319,6 +319,13 @@
<entry>data/paging</entry>
</row>
<row>
+ <entry><link linkend="paging.main.config"
+ >page-max-concurrent-io</link></entry>
+ <entry>integer</entry>
+ <entry>The maximum number of concurrent reads the
system will do on the paging files</entry>
+ <entry>5</entry>
+ </row>
+ <row>
<entry><link
linkend="configuring.delivery.count.persistence">
persist-delivery-count-before-delivery</link></entry>
<entry>Boolean</entry>
Modified: branches/Branch_2_2_AS7/docs/user-manual/en/paging.xml
===================================================================
--- branches/Branch_2_2_AS7/docs/user-manual/en/paging.xml 2011-12-23 16:54:27 UTC (rev
11942)
+++ branches/Branch_2_2_AS7/docs/user-manual/en/paging.xml 2012-01-02 13:49:15 UTC (rev
11943)
@@ -52,6 +52,7 @@
...
<paging-directory>/somewhere/paging-directory</paging-directory>
+ <page-max-concurrent-io>/5</paging-directory>
... </programlisting>
<para>
@@ -75,6 +76,12 @@
each address being paged under this configured
location.</entry>
<entry>data/paging</entry>
</row>
+ <row>
+
<entry><literal>page-max-concurrent-io</literal></entry>
+ <entry>The maximum number of concurrent reads the
system can make on paged files. You may increase this parameter depending on
+ the expected number of paged destinations and the
limits you have on your disk</entry>
+ <entry>5</entry>
+ </row>
</tbody>
</tgroup>
</table>
Modified: branches/Branch_2_2_AS7/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/Branch_2_2_AS7/src/config/common/schema/hornetq-configuration.xsd 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/config/common/schema/hornetq-configuration.xsd 2012-01-02
13:49:15 UTC (rev 11943)
@@ -146,6 +146,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="create-bindings-dir" type="xsd:boolean">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="page-max-concurrent-io" type="xsd:string">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="journal-directory" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="create-journal-dir" type="xsd:boolean">
Modified:
branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/hornetq-configuration.xml
===================================================================
---
branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/ra.xml
===================================================================
--- branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/ra.xml 2011-12-23 16:54:27 UTC
(rev 11942)
+++ branches/Branch_2_2_AS7/src/config/jboss-as-4/clustered/ra.xml 2012-01-02 13:49:15 UTC
(rev 11943)
@@ -271,19 +271,19 @@
<config-property>
<description>whether to use jndi for looking up destinations
etc</description>
<config-property-name>UseJNDI</config-property-name>
- <config-property-type>boolean</config-property-type>
+ <config-property-type>java.lang.Boolean</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>how long in milliseconds to wait before retry on failed MDB
setup</description>
<config-property-name>SetupInterval</config-property-name>
- <config-property-type>long</config-property-type>
+ <config-property-type>java.lang.Long</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>How many attempts should be made when connecting the
MDB</description>
<config-property-name>SetupAttempts</config-property-name>
- <config-property-type>int</config-property-type>
+ <config-property-type>java.lang.Integer</config-property-type>
<config-property-value></config-property-value>
</config-property>-->
Modified:
branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
===================================================================
---
branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/hornetq-configuration.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/ra.xml
===================================================================
--- branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/ra.xml 2011-12-23 16:54:27
UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/config/jboss-as-4/non-clustered/ra.xml 2012-01-02 13:49:15
UTC (rev 11943)
@@ -271,7 +271,7 @@
<config-property>
<description>whether to use jndi for looking up destinations
etc</description>
<config-property-name>UseJNDI</config-property-name>
- <config-property-type>boolean</config-property-type>
+ <config-property-type>java.lang.Boolean</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
Modified:
branches/Branch_2_2_AS7/src/config/jboss-as-5/clustered/hornetq-configuration.xml
===================================================================
---
branches/Branch_2_2_AS7/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified:
branches/Branch_2_2_AS7/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
===================================================================
---
branches/Branch_2_2_AS7/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/config/jboss-as-5/non-clustered/hornetq-configuration.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified:
branches/Branch_2_2_AS7/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
---
branches/Branch_2_2_AS7/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified:
branches/Branch_2_2_AS7/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
===================================================================
---
branches/Branch_2_2_AS7/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/config/jboss-as-6/non-clustered/hornetq-configuration.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
Modified: branches/Branch_2_2_AS7/src/config/ra.xml
===================================================================
--- branches/Branch_2_2_AS7/src/config/ra.xml 2011-12-23 16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/config/ra.xml 2012-01-02 13:49:15 UTC (rev 11943)
@@ -8,8 +8,8 @@
http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd"
version="1.5">
- <description>HornetQ 2.0 Resource Adapter</description>
- <display-name>HornetQ 2.0 Resource Adapter</display-name>
+ <description>HornetQ 2.2 Resource Adapter</description>
+ <display-name>HornetQ 2.2 Resource Adapter</display-name>
<vendor-name>Red Hat Middleware LLC</vendor-name>
<eis-type>JMS 1.1 Server</eis-type>
@@ -246,19 +246,19 @@
<config-property>
<description>whether to use jndi for looking up destinations
etc</description>
<config-property-name>UseJNDI</config-property-name>
- <config-property-type>boolean</config-property-type>
+ <config-property-type>java.lang.Boolean</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>how long in milliseconds to wait before retry on failed MDB
setup</description>
<config-property-name>SetupInterval</config-property-name>
- <config-property-type>long</config-property-type>
+ <config-property-type>java.lang.Long</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>How many attempts should be made when connecting the
MDB</description>
<config-property-name>SetupAttempts</config-property-name>
- <config-property-type>int</config-property-type>
+ <config-property-type>java.lang.Integer</config-property-type>
<config-property-value></config-property-value>
</config-property>-->
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/management/ClusterConnectionControl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -49,6 +49,10 @@
boolean isForwardWhenNoConsumers();
/**
+ * Return the Topology that this Cluster Connection knows about
+ */
+ String getTopology();
+ /**
* Returns the maximum number of hops used by this cluster connection.
*/
int getMaxHops();
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -77,6 +77,10 @@
*/
@Operation(desc = "Adds the queue to another JNDI binding")
void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the
binding for JNDI") String jndi) throws Exception;
+
+ @Operation(desc = "Adds the queue to another JNDI binding")
+ void removeJNDI(@Parameter(name = "jndiBinding", desc = "the name of
the binding for JNDI") String jndi) throws Exception;
+
/**
* Lists all the JMS messages in this queue matching the specified filter.
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/TopicControl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/TopicControl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/api/jms/management/TopicControl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -13,7 +13,6 @@
package org.hornetq.api.jms.management;
-import java.util.List;
import java.util.Map;
import javax.management.MBeanOperationInfo;
@@ -64,8 +63,13 @@
/**
* Add the JNDI binding to this destination
*/
- @Operation(desc = "Adds the queue to another JNDI binding")
+ @Operation(desc = "Add the queue to another JNDI binding")
void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the
binding for JNDI") String jndi) throws Exception;
+
+ @Operation(desc = "Add the queue to another JNDI binding")
+ void removeJNDI(@Parameter(name = "jndiBinding", desc = "the name of
the binding for JNDI") String jndi) throws Exception;
+
+
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -76,6 +76,13 @@
private final boolean browseOnly;
private final Executor sessionExecutor;
+
+ // For failover we can't send credits back
+ // while holding a lock or failover could dead lock eventually
+ // And we can't use the sessionExecutor as that's being used for message
handlers
+ // for that reason we have a separate flowControlExecutor that's using the thread
pool
+ // Which is a OrderedExecutor
+ private final Executor flowControlExecutor;
private final int clientWindowSize;
@@ -104,6 +111,8 @@
private volatile boolean closed;
private volatile int creditsToSend;
+
+ private volatile boolean failedOver;
private volatile Exception lastException;
@@ -133,6 +142,7 @@
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
final Executor executor,
+ final Executor flowControlExecutor,
final Channel channel,
final SessionQueueQueryResponseMessage queueInfo,
final ClassLoader contextClassLoader)
@@ -160,12 +170,14 @@
this.queueInfo = queueInfo;
this.contextClassLoader = contextClassLoader;
+
+ this.flowControlExecutor = flowControlExecutor;
}
// ClientConsumer implementation
// -----------------------------------------------------------------
- private ClientMessage receive(long timeout, final boolean forcingDelivery) throws
HornetQException
+ private ClientMessage receive(final long timeout, final boolean forcingDelivery)
throws HornetQException
{
checkClosed();
@@ -194,17 +206,14 @@
receiverThread = Thread.currentThread();
- if (timeout == 0)
- {
- // Effectively infinite
- timeout = Long.MAX_VALUE;
- }
-
+ // To verify if deliveryForced was already call
boolean deliveryForced = false;
+ // To control when to call deliveryForce
+ boolean callForceDelivery = false;
long start = -1;
- long toWait = timeout;
+ long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
try
{
@@ -231,13 +240,8 @@
// we only force delivery once per call to receive
if (!deliveryForced)
{
- if (isTrace)
- {
- log.trace("Forcing delivery");
- }
- session.forceDelivery(id, forceDeliveryCount++);
-
- deliveryForced = true;
+ callForceDelivery = true;
+ break;
}
}
@@ -262,6 +266,35 @@
}
}
+ if (failedOver)
+ {
+ if (m == null)
+ {
+ // if failed over and the buffer is null, we reset the state and try it
again
+ failedOver = false;
+ deliveryForced = false;
+ toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
+ continue;
+ }
+ else
+ {
+ failedOver = false;
+ }
+ }
+
+ if (callForceDelivery)
+ {
+ if (isTrace)
+ {
+ log.trace("Forcing delivery");
+ }
+ // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid
distributed dead locks
+ session.forceDelivery(id, forceDeliveryCount++);
+ callForceDelivery = false;
+ deliveryForced = true;
+ continue;
+ }
+
if (m != null)
{
session.workDone();
@@ -302,7 +335,7 @@
if (expired)
{
m.discardBody();
-
+
session.expire(id, m.getMessageID());
if (clientWindowSize == 0)
@@ -351,19 +384,14 @@
public ClientMessage receive(final long timeout) throws HornetQException
{
- if (isBrowseOnly())
+ ClientMessage msg = receive(timeout, false);
+
+ if (msg == null && !closed)
{
- ClientMessage msg = receive(timeout, false);
- if (msg == null)
- {
- msg = receive(0, true);
- }
- return msg;
+ msg = receive(0, true);
}
- else
- {
- return receive(timeout, false);
- }
+
+ return msg;
}
public ClientMessage receive() throws HornetQException
@@ -465,6 +493,8 @@
lastAckedMessage = null;
creditsToSend = 0;
+
+ failedOver = true;
ackIndividually = false;
}
@@ -566,10 +596,10 @@
// Flow control for the first packet, we will have others
- flowControl(packet.getPacketSize(), false);
-
ClientLargeMessageInternal currentChunkMessage =
(ClientLargeMessageInternal)packet.getLargeMessage();
+ currentChunkMessage.setFlowControlSize(packet.getPacketSize());
+
currentChunkMessage.setDeliveryCount(packet.getDeliveryCount());
File largeMessageCache = null;
@@ -592,8 +622,6 @@
currentChunkMessage.setLargeMessageController(currentLargeMessageController);
}
- currentChunkMessage.setFlowControlSize(0);
-
handleMessage(currentChunkMessage);
}
@@ -726,11 +754,6 @@
{
creditsToSend += messageBytes;
- if (log.isTraceEnabled())
- {
- log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend +
", clientWindowSize = " + clientWindowSize + " messageBytes = " +
messageBytes);
- }
-
if (creditsToSend >= clientWindowSize)
{
if (clientWindowSize == 0 && discountSlowConsumer)
@@ -753,9 +776,9 @@
}
else
{
- if (ClientConsumerImpl.trace)
+ if (log.isDebugEnabled())
{
- ClientConsumerImpl.log.trace("Sending " + messageBytes +
" from flow-control");
+ ClientConsumerImpl.log.debug("Sending " + messageBytes +
" from flow-control");
}
final int credits = creditsToSend;
@@ -826,7 +849,13 @@
*/
private void sendCredits(final int credits)
{
- channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ flowControlExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ }
+ });
}
private void waitForOnMessageToComplete(boolean waitForOnMessage)
@@ -887,6 +916,8 @@
{
rateLimiter.limit();
}
+
+ failedOver = false;
synchronized (this)
{
@@ -975,7 +1006,8 @@
// Chunk messages will execute the flow control while receiving the chunks
if (message.getFlowControlSize() != 0)
{
- flowControl(message.getFlowControlSize(), true);
+ // on large messages we should discount 1 on the first packets as we need
continuity until the last packet
+ flowControl(message.getFlowControlSize(), !message.isLargeMessage());
}
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -50,44 +50,51 @@
public ClientProducerCredits getCredits(final SimpleString address, final boolean
anon)
{
- boolean needInit = false;
- ClientProducerCredits credits;
-
- synchronized(this)
+ if (windowSize == -1)
{
- credits = producerCredits.get(address);
-
- if (credits == null)
+ return ClientProducerCreditsNoFlowControl.instance;
+ }
+ else
+ {
+ boolean needInit = false;
+ ClientProducerCredits credits;
+
+ synchronized(this)
{
- // Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, address, windowSize);
- needInit = true;
-
- producerCredits.put(address, credits);
+ credits = producerCredits.get(address);
+
+ if (credits == null)
+ {
+ // Doesn't need to be fair since session is single threaded
+ credits = new ClientProducerCreditsImpl(session, address, windowSize);
+ needInit = true;
+
+ producerCredits.put(address, credits);
+ }
+
+ if (!anon)
+ {
+ credits.incrementRefCount();
+
+ // Remove from anon credits (if there)
+ unReferencedCredits.remove(address);
+ }
+ else
+ {
+ addToUnReferencedCache(address, credits);
+ }
}
-
- if (!anon)
+
+ // The init is done outside of the lock
+ // otherwise packages may arrive with flow control
+ // while this is still sending requests causing a dead lock
+ if (needInit)
{
- credits.incrementRefCount();
+ credits.init();
+ }
- // Remove from anon credits (if there)
- unReferencedCredits.remove(address);
- }
- else
- {
- addToUnReferencedCache(address, credits);
- }
+ return credits;
}
-
- // The init is done outside of the lock
- // otherwise packages may arrive with flow control
- // while this is still sending requests causing a dead lock
- if (needInit)
- {
- credits.init();
- }
-
- return credits;
}
public synchronized void returnCredits(final SimpleString address)
@@ -166,5 +173,50 @@
credits.close();
}
+
+
+ static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits
+ {
+ static ClientProducerCreditsNoFlowControl instance = new
ClientProducerCreditsNoFlowControl();
+ public void acquireCredits(int credits) throws InterruptedException
+ {
+ }
+
+ public void receiveCredits(int credits)
+ {
+ }
+
+ public boolean isBlocked()
+ {
+ return false;
+ }
+
+ public void init()
+ {
+ }
+
+ public void reset()
+ {
+ }
+
+ public void close()
+ {
+ }
+
+ public void incrementRefCount()
+ {
+ }
+
+ public int decrementRefCount()
+ {
+ return 1;
+ }
+
+ public void releaseOutstanding()
+ {
+ }
+
+ }
+
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -672,9 +672,10 @@
}
else
{
- if (connection != null)
+ CoreRemotingConnection connectionToDestory = connection;
+ if (connectionToDestory != null)
{
- connection.destroy();
+ connectionToDestory.destroy();
}
connection = null;
@@ -831,6 +832,7 @@
connection,
response.getServerVersion(),
sessionChannel,
+
orderedExecutorFactory.getExecutor(),
orderedExecutorFactory.getExecutor());
synchronized (sessions)
@@ -922,6 +924,17 @@
*/
private void reconnectSessions(final CoreRemotingConnection oldConnection, final int
reconnectAttempts)
{
+ HashSet<ClientSessionInternal> sessionsToFailover;
+ synchronized (sessions)
+ {
+ sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
+ }
+
+ for (ClientSessionInternal session : sessionsToFailover)
+ {
+ session.preHandleFailover(connection);
+ }
+
getConnectionWithRetry(reconnectAttempts);
if (connection == null)
@@ -947,12 +960,6 @@
connection.setFailureListeners(newListeners);
- HashSet<ClientSessionInternal> sessionsToFailover;
- synchronized (sessions)
- {
- sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
- }
-
for (ClientSessionInternal session : sessionsToFailover)
{
session.handleFailover(connection);
@@ -969,7 +976,7 @@
" multiplier = " +
retryIntervalMultiplier, new
Exception("trace"));
}
-
+
long interval = retryInterval;
int count = 0;
@@ -1043,6 +1050,10 @@
}
else
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Reconnection successfull");
+ }
return;
}
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -129,6 +129,9 @@
private final boolean xa;
private final Executor executor;
+
+ // to be sent to consumers as consumers will need a separate consumer for flow
control
+ private final Executor flowControlExecutor;
private volatile CoreRemotingConnection remotingConnection;
@@ -228,7 +231,8 @@
final CoreRemotingConnection remotingConnection,
final int version,
final Channel channel,
- final Executor executor) throws HornetQException
+ final Executor executor,
+ final Executor flowControlExecutor) throws HornetQException
{
this.sessionFactory = sessionFactory;
@@ -241,6 +245,8 @@
this.remotingConnection = remotingConnection;
this.executor = executor;
+
+ this.flowControlExecutor = flowControlExecutor;
this.xa = xa;
@@ -398,15 +404,8 @@
{
checkClosed();
- // JBPAPP-6030 - Using the executor to avoid distributed dead locks
- executor.execute(new Runnable()
- {
- public void run()
- {
- SessionForceConsumerDelivery request = new
SessionForceConsumerDelivery(consumerID, sequence);
- channel.send(request);
- }
- });
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID,
sequence);
+ channel.send(request);
}
public ClientConsumer createConsumer(final SimpleString queueName) throws
HornetQException
@@ -594,8 +593,9 @@
stop();
}
+
// We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clear(true);
}
@@ -672,7 +672,7 @@
if (!started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
clientConsumerInternal.start();
}
@@ -694,7 +694,7 @@
if (started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
clientConsumerInternal.stop(waitForOnMessage);
}
@@ -756,7 +756,10 @@
}
checkClosed();
-
+ if (log.isDebugEnabled())
+ {
+ log.debug("client ack messageID = " + messageID);
+ }
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID,
messageID, blockOnAcknowledge);
if (blockOnAcknowledge)
@@ -816,7 +819,10 @@
public void addProducer(final ClientProducerInternal producer)
{
- producers.add(producer);
+ synchronized (producers)
+ {
+ producers.add(producer);
+ }
}
public void removeConsumer(final ClientConsumerInternal consumer) throws
HornetQException
@@ -829,12 +835,15 @@
public void removeProducer(final ClientProducerInternal producer)
{
- producers.remove(producer);
+ synchronized (producers)
+ {
+ producers.remove(producer);
+ }
}
public void handleReceiveMessage(final long consumerID, final SessionReceiveMessage
message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -850,7 +859,7 @@
public void handleReceiveLargeMessage(final long consumerID, final
SessionReceiveLargeMessage message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -860,7 +869,7 @@
public void handleReceiveContinuation(final long consumerID, final
SessionReceiveContinuationMessage continuation) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -928,6 +937,14 @@
sendAckHandler = handler;
}
+ public void preHandleFailover(CoreRemotingConnection connection)
+ {
+ // We lock the channel to prevent any packets to be added to the resend
+ // cache during the failover process
+ //we also do this before the connection fails over to give the session a chance to
block for failover
+ channel.lock();
+ }
+
// Needs to be synchronized to prevent issues with occurring concurrently with
close()
public void handleFailover(final CoreRemotingConnection backupConnection)
@@ -941,9 +958,6 @@
boolean resetCreditManager = false;
- // We lock the channel to prevent any packets to be added to the resend
- // cache during the failover process
- channel.lock();
try
{
channel.transferConnection(backupConnection);
@@ -1090,7 +1104,7 @@
// Now start the session if it was already started
if (started)
{
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clearAtFailover();
consumer.start();
@@ -1527,7 +1541,7 @@
}
// We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clear(false);
}
@@ -1787,6 +1801,7 @@
false)
:
null,
executor,
+ flowControlExecutor,
channel,
queueInfo,
lookupTCCL());
@@ -1871,6 +1886,19 @@
});
}
+
+ /**
+ * @param consumerID
+ * @return
+ */
+ private ClientConsumerInternal getConsumer(final long consumerID)
+ {
+ synchronized (consumers)
+ {
+ ClientConsumerInternal consumer = consumers.get(consumerID);
+ return consumer;
+ }
+ }
private void doCleanup(boolean failingOver)
{
@@ -1900,14 +1928,14 @@
private void cleanUpChildren() throws Exception
{
- Set<ClientConsumerInternal> consumersClone = new
HashSet<ClientConsumerInternal>(consumers.values());
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumerInternal consumer : consumersClone)
{
consumer.cleanUp();
}
- Set<ClientProducerInternal> producersClone = new
HashSet<ClientProducerInternal>(producers);
+ Set<ClientProducerInternal> producersClone = cloneProducers();
for (ClientProducerInternal producer : producersClone)
{
@@ -1915,16 +1943,41 @@
}
}
+ /**
+ * @return
+ */
+ private Set<ClientProducerInternal> cloneProducers()
+ {
+ Set<ClientProducerInternal> producersClone;
+
+ synchronized (producers)
+ {
+ producersClone = new HashSet<ClientProducerInternal>(producers);
+ }
+ return producersClone;
+ }
+
+ /**
+ * @return
+ */
+ private Set<ClientConsumerInternal> cloneConsumers()
+ {
+ synchronized (consumers)
+ {
+ return new HashSet<ClientConsumerInternal>(consumers.values());
+ }
+ }
+
private void closeChildren() throws HornetQException
{
- Set<ClientConsumer> consumersClone = new
HashSet<ClientConsumer>(consumers.values());
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumer consumer : consumersClone)
{
consumer.close();
}
- Set<ClientProducer> producersClone = new
HashSet<ClientProducer>(producers);
+ Set<ClientProducerInternal> producersClone = cloneProducers();
for (ClientProducer producer : producersClone)
{
@@ -1934,12 +1987,9 @@
private void flushAcks() throws HornetQException
{
- synchronized (consumers)
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.flushAcks();
- }
+ consumer.flushAcks();
}
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -59,6 +59,8 @@
void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage
continuation) throws Exception;
+ void preHandleFailover(CoreRemotingConnection connection);
+
void handleFailover(CoreRemotingConnection backupConnection);
RemotingConnection getConnection();
@@ -92,5 +94,4 @@
void setPacketSize(int packetSize);
void resetIfNeeded() throws HornetQException;
-
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -373,6 +373,11 @@
return session.getXAResource();
}
+ public void preHandleFailover(CoreRemotingConnection connection)
+ {
+ session.preHandleFailover(connection);
+ }
+
public void handleFailover(final CoreRemotingConnection backupConnection)
{
session.handleFailover(backupConnection);
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -59,6 +59,7 @@
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener,
Serializable
{
/*needed for backward compatibility*/
+ @SuppressWarnings("unused")
private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
/*end of compatibility fixes*/
@@ -83,7 +84,7 @@
private final Set<ClientSessionFactoryInternal> connectingFactories = new
HashSet<ClientSessionFactoryInternal>();
- private TransportConfiguration[] initialConnectors;
+ private volatile TransportConfiguration[] initialConnectors;
private DiscoveryGroupConfiguration discoveryGroupConfiguration;
@@ -573,12 +574,15 @@
public ClientSessionFactoryInternal connect() throws Exception
{
- // static list of initial connectors
- if (initialConnectors != null && discoveryGroup == null)
+ synchronized (this)
{
- ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)staticConnector.connect();
- addFactory(sf);
- return sf;
+ // static list of initial connectors
+ if (initialConnectors != null && discoveryGroup == null)
+ {
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)staticConnector.connect();
+ addFactory(sf);
+ return sf;
+ }
}
// wait for discovery group to get the list of initial connectors
return (ClientSessionFactoryInternal)createSessionFactory();
@@ -675,7 +679,10 @@
private void removeFromConnecting(ClientSessionFactoryInternal factory)
{
- connectingFactories.remove(factory);
+ synchronized (connectingFactories)
+ {
+ connectingFactories.remove(factory);
+ }
}
private void addToConnecting(ClientSessionFactoryInternal factory)
@@ -1272,23 +1279,24 @@
connectingFactories.clear();
}
+ Set<ClientSessionFactoryInternal> clonedFactory;
synchronized (factories)
{
- Set<ClientSessionFactoryInternal> clonedFactory = new
HashSet<ClientSessionFactoryInternal>(factories);
+ clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
- for (ClientSessionFactory factory : clonedFactory)
+ factories.clear();
+ }
+
+ for (ClientSessionFactory factory : clonedFactory)
+ {
+ if (sendClose)
{
- if (sendClose)
- {
- factory.close();
- }
- else
- {
- factory.cleanup();
- }
+ factory.close();
}
-
- factories.clear();
+ else
+ {
+ factory.cleanup();
+ }
}
if (shutdownPool)
@@ -1403,8 +1411,14 @@
if (actMember != null && actMember.getConnector().getA() != null &&
actMember.getConnector().getB() != null)
{
- for (ClientSessionFactory factory : factories)
+ HashSet<ClientSessionFactory> clonedFactories = new
HashSet<ClientSessionFactory>();
+ synchronized (factories)
{
+ clonedFactories.addAll(factories);
+ }
+
+ for (ClientSessionFactory factory : clonedFactories)
+ {
((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().getA(),
actMember.getConnector().getB());
}
@@ -1444,6 +1458,7 @@
"]";
}
+ @SuppressWarnings("unchecked")
private synchronized void updateArraysAndPairs()
{
Collection<TopologyMember> membersCopy = topology.getMembers();
@@ -1462,13 +1477,14 @@
{
List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- this.initialConnectors =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+
+ TransportConfiguration[] newInitialconnectors =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
newConnectors.size());
int count = 0;
for (DiscoveryEntry entry : newConnectors)
{
- this.initialConnectors[count++] = entry.getConnector();
+ newInitialconnectors[count++] = entry.getConnector();
if (ha && topology.getMember(entry.getNodeID()) == null)
{
@@ -1477,33 +1493,53 @@
topology.updateMember(0, entry.getNodeID(), member);
}
}
+
+ this.initialConnectors = newInitialconnectors;
if (clusterConnection && !receivedTopology &&
initialConnectors.length > 0)
{
- // FIXME the node is alone in the cluster. We create a connection to the new
node
+ // The node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
- try
+
+ Runnable connectRunnable = new Runnable()
{
- connect();
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ };
+ if (startExecutor != null)
+ {
+ startExecutor.execute(connectRunnable);
}
- catch (Exception e)
+ else
{
- e.printStackTrace(); // To change body of catch statement use File | Settings
| File Templates.
+ connectRunnable.run();
}
}
}
- public synchronized void factoryClosed(final ClientSessionFactory factory)
+ public void factoryClosed(final ClientSessionFactory factory)
{
- factories.remove(factory);
-
- if (!clusterConnection && factories.isEmpty())
+ synchronized (factories)
{
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topologyArray = null;
+ factories.remove(factory);
+
+ if (!clusterConnection && factories.isEmpty())
+ {
+ // Go back to using the broadcast or static list
+
+ receivedTopology = false;
+
+ topologyArray = null;
+ }
}
}
@@ -1522,29 +1558,30 @@
topology.removeClusterTopologyListener(listener);
}
- private synchronized void addFactory(ClientSessionFactoryInternal factory)
+ private void addFactory(ClientSessionFactoryInternal factory)
{
if (factory == null)
{
return;
}
- synchronized (factories)
+ if (isClosed())
{
- if (isClosed())
- {
- factory.close();
- return;
- }
+ factory.close();
+ return;
+ }
- TransportConfiguration backup = null;
+ TransportConfiguration backup = null;
- if (ha)
- {
- backup =
topology.getBackupForConnector(factory.getConnectorConfiguration());
- }
+ if (ha)
+ {
+ backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ }
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+
+ synchronized (factories)
+ {
factories.add(factory);
}
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -225,6 +225,11 @@
}
else
{
+ /*always add the backup, better to try to reconnect to something thats not
there then to not know about it at all*/
+ if(currentMember.getB() == null && memberInput.getB() != null)
+ {
+ currentMember.setB(memberInput.getB());
+ }
return false;
}
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/BridgeConfiguration.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -68,6 +68,10 @@
private final long maxRetryInterval;
private final int minLargeMessageSize;
+
+ // At this point this is only changed on testcases
+ // The bridge shouldn't be sending blocking anyways
+ private long callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
/**
* For backward compatibility on the API... no MinLargeMessage on this constructor
@@ -445,4 +449,26 @@
{
this.password = password;
}
+
+
+ /**
+ * @return the callTimeout
+ */
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ /**
+ *
+ * At this point this is only changed on testcases
+ * The bridge shouldn't be sending blocking anyways
+ * @param callTimeout the callTimeout to set
+ */
+ public void setCallTimeout(long callTimeout)
+ {
+ this.callTimeout = callTimeout;
+ }
+
+
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/Configuration.java 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/Configuration.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -473,6 +473,15 @@
* Sets the file system directory used to store bindings.
*/
void setBindingsDirectory(String dir);
+
+ /** The max number of concurrent reads allowed on paging.
+ *
+ * Default = 5 */
+ int getPageMaxConcurrentIO();
+
+ /** The max number of concurrent reads allowed on paging.
+ * Default = 5 */
+ void setPageMaxConcurrentIO(int maxIO);
/**
* Returns the file system directory used to store journal log.
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -89,6 +89,8 @@
public static final String DEFAULT_PAGING_DIR = "data/paging";
public static final String DEFAULT_LARGE_MESSAGES_DIR =
"data/largemessages";
+
+ public static final int DEFAULT_MAX_CONCURRENT_PAGE_IO = 5;
public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
@@ -266,6 +268,8 @@
protected String pagingDirectory = ConfigurationImpl.DEFAULT_PAGING_DIR;
// File related attributes
-----------------------------------------------------------
+
+ protected int maxConcurrentPageIO = ConfigurationImpl.DEFAULT_MAX_CONCURRENT_PAGE_IO;
protected String largeMessagesDirectory =
ConfigurationImpl.DEFAULT_LARGE_MESSAGES_DIR;
@@ -624,7 +628,25 @@
{
bindingsDirectory = dir;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.config.Configuration#getPageMaxConcurrentIO()
+ */
+ public int getPageMaxConcurrentIO()
+ {
+ return maxConcurrentPageIO;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.config.Configuration#setPageMaxConcurrentIO(int)
+ */
+ public void setPageMaxConcurrentIO(int maxIO)
+ {
+ this.maxConcurrentPageIO = maxIO;
+ }
+
+
public String getJournalDirectory()
{
return journalDirectory;
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -424,6 +424,12 @@
"journal-directory",
config.getJournalDirectory(),
Validators.NOT_NULL_OR_EMPTY));
+
+
+ config.setPageMaxConcurrentIO(XMLConfigurationUtil.getInteger(e,
+
"page-max-concurrent-io",
+ 5,
+
Validators.MINUS_ONE_OR_GT_ZERO));
config.setPagingDirectory(XMLConfigurationUtil.getString(e,
"paging-directory",
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -35,6 +35,14 @@
/** The SequentialFile will call this method when a disk IO Error happens during the
live phase. */
void onIOError(int errorCode, String message, SequentialFile file);
+ /** used for cases where you need direct buffer outside of the journal context.
+ * This is because the native layer has a method that can be reused in certain cases
like paging */
+ ByteBuffer allocateDirectBuffer(int size);
+
+ /** used for cases where you need direct buffer outside of the journal context.
+ * This is because the native layer has a method that can be reused in certain cases
like paging */
+ void releaseDirectBuffer(ByteBuffer buffer);
+
/**
* Note: You need to release the buffer if is used for reading operations.
* You don't need to do it if using writing operations (AIO Buffer Lister
will take of writing operations)
@@ -55,6 +63,8 @@
int getAlignment();
int calculateBlockSize(int bytes);
+
+ String getDirectory();
void clearBuffer(ByteBuffer buffer);
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -113,7 +113,29 @@
{
return AsynchronousFileImpl.isLoaded();
}
+
+ public ByteBuffer allocateDirectBuffer(final int size)
+ {
+
+ int blocks = size / 512;
+ if (size % 512 != 0)
+ {
+ blocks ++;
+ }
+
+ // The buffer on AIO has to be a multiple of 512
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(blocks * 512);
+
+ buffer.limit(size);
+ return buffer;
+ }
+
+ public void releaseDirectBuffer(final ByteBuffer buffer)
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
+
public ByteBuffer newBuffer(int size)
{
if (size % 512 != 0)
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -111,6 +111,11 @@
}
}
}
+
+ public String getDirectory()
+ {
+ return journalDir;
+ }
public void start()
{
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -99,6 +99,17 @@
{
return timedBuffer != null;
}
+
+
+ public ByteBuffer allocateDirectBuffer(final int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ public void releaseDirectBuffer(ByteBuffer buffer)
+ {
+ // nothing we can do on this case. we can just have good faith on GC
+ }
public ByteBuffer newBuffer(final int size)
{
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -210,6 +210,18 @@
}
}
+ public String getTopology()
+ {
+ clearIO();
+ try
+ {
+ return clusterConnection.getTopology().describe();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
public Map<String, String> getNodes() throws Exception
{
clearIO();
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PagingStore.java 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PagingStore.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -43,6 +43,8 @@
int getCurrentWritingPage();
SimpleString getStoreName();
+
+ String getFolder();
AddressFullMessagePolicy getAddressFullMessagePolicy();
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PrintPages.java 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/PrintPages.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -105,9 +105,15 @@
for (SimpleString store : stores)
{
+ PagingStore pgStore = manager.getPageStore(store);
+ String folder = null;
+
+ if (pgStore != null)
+ {
+ folder = pgStore.getFolder();
+ }
System.out.println("####################################################################################################");
- System.out.println("Exploring store " + store);
- PagingStore pgStore = manager.getPageStore(store);
+ System.out.println("Exploring store " + store + " folder =
" + folder);
int pgid = (int)pgStore.getFirstPage();
for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
{
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -58,7 +58,7 @@
void scheduleCleanupCheck();
- void cleanupEntries() throws Exception;
+ void cleanupEntries(boolean completeDelete) throws Exception;
void disableAutoCleanup();
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -44,5 +44,8 @@
* @param variance
*/
void addInc(long id, int variance);
+
+ // used when deleting the counter
+ void delete() throws Exception;
}
\ No newline at end of file
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -183,6 +183,7 @@
{
page = pagingStore.createPage((int)pageId);
+ storageManager.beforePageRead();
page.open();
List<PagedMessage> pgdMessages = page.read(storageManager);
@@ -200,6 +201,7 @@
catch (Throwable ignored)
{
}
+ storageManager.afterPageRead();
cache.unlock();
}
}
@@ -455,8 +457,26 @@
// The page is not on cache any more
// We need to read the page-file before deleting it
// to make sure we remove any large-messages pending
- depagedPage.open();
- List<PagedMessage> pgdMessagesList =
depagedPage.read(storageManager);
+ storageManager.beforePageRead();
+
+ List<PagedMessage> pgdMessagesList = null;
+ try
+ {
+ depagedPage.open();
+ pgdMessagesList = depagedPage.read(storageManager);
+ }
+ finally
+ {
+ try
+ {
+ depagedPage.close();
+ }
+ catch (Exception e)
+ {
+ }
+
+ storageManager.afterPageRead();
+ }
depagedPage.close();
pgdMessages = pgdMessagesList.toArray(new
PagedMessage[pgdMessagesList.size()]);
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -175,6 +175,32 @@
}
}
+
+ public void delete() throws Exception
+ {
+ synchronized (this)
+ {
+ long tx = storage.generateUniqueID();
+
+ boolean txUsed = false;
+ for (Long record : incrementRecords)
+ {
+ txUsed = true;
+ storage.deleteIncrementRecord(tx, record.longValue());
+ }
+
+ if (recordID >= 0)
+ {
+ txUsed = true;
+ storage.deletePageCounter(tx, this.recordID);
+ }
+
+ if (txUsed)
+ {
+ storage.commit(tx);
+ }
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -201,7 +201,7 @@
{
try
{
- cleanupEntries();
+ cleanupEntries(false);
}
catch (Exception e)
{
@@ -215,8 +215,12 @@
/**
* It will cleanup all the records for completed pages
* */
- public void cleanupEntries() throws Exception
+ public void cleanupEntries(final boolean completeDelete) throws Exception
{
+ if (completeDelete)
+ {
+ counter.delete();
+ }
Transaction tx = new TransactionImpl(store);
boolean persist = false;
@@ -292,7 +296,10 @@
}
}
- cursorProvider.scheduleCleanup();
+ if (!completeDelete)
+ {
+ cursorProvider.scheduleCleanup();
+ }
}
});
}
@@ -1290,7 +1297,11 @@
public void remove()
{
deliveredCount.incrementAndGet();
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+ PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(position);
+ if (info != null)
+ {
+ info.remove(position);
+ }
}
/* (non-Javadoc)
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -269,6 +269,19 @@
{
return pageSize;
}
+
+ public String getFolder()
+ {
+ SequentialFileFactory factoryUsed = this.fileFactory;
+ if (factoryUsed != null)
+ {
+ return factoryUsed.getDirectory();
+ }
+ else
+ {
+ return null;
+ }
+ }
public boolean isPaging()
{
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -96,6 +97,34 @@
* in case of the pools are full
* @throws Exception */
void waitOnOperations() throws Exception;
+
+ /**
+ * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+ * otherwise the system may become irrensponsive if too many destinations are reading
all the same time.
+ * This is called before we read, so we can limit concurrent reads
+ * @throws Exception
+ */
+ void beforePageRead() throws Exception;
+
+ /**
+ * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+ * otherwise the system may become irrensponsive if too many destinations are reading
all the same time.
+ * This is called after we read, so we can limit concurrent reads
+ * @throws Exception
+ */
+ void afterPageRead() throws Exception;
+
+
+ /** AIO has an optimized buffer which has a method to release it
+ instead of the way NIO will release data based on GC.
+ These methods will use that buffer if the inner method supports it */
+ ByteBuffer allocateDirectBuffer(int size);
+
+ /** AIO has an optimized buffer which has a method to release it
+ instead of the way NIO will release data based on GC.
+ These methods will use that buffer if the inner method supports it */
+ void freeDirectuffer(ByteBuffer buffer);
+
void clearContext();
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -31,6 +31,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import javax.transaction.xa.Xid;
@@ -157,6 +158,8 @@
public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
+
+ private final Semaphore pageMaxConcurrentIO;
private final BatchingIDGenerator idGenerator;
@@ -167,6 +170,8 @@
private final Journal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
+
+ private SequentialFileFactory journalFF = null;
private volatile boolean started;
@@ -270,8 +275,6 @@
syncTransactional = config.isJournalSyncTransactional();
- SequentialFileFactory journalFF = null;
-
if (config.getJournalType() == JournalType.ASYNCIO)
{
JournalStorageManager.log.info("Using AIO Journal");
@@ -329,6 +332,15 @@
largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false,
criticalErrorListener);
perfBlastPages = config.getJournalPerfBlastPages();
+
+ if (config.getPageMaxConcurrentIO() != 1)
+ {
+ pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO());
+ }
+ else
+ {
+ pageMaxConcurrentIO = null;
+ }
}
public void clearContext()
@@ -1133,7 +1145,9 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
+
}
break;
@@ -1152,7 +1166,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + "
while reloading PAGE_CURSOR_COUNTER_VALUE, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1172,7 +1187,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + "
while reloading PAGE_CURSOR_COUNTER_INC, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1262,7 +1278,7 @@
{
if (msg.getRefCount() == 0)
{
- JournalStorageManager.log.debug("Large message: " +
msg.getMessageID() +
+ JournalStorageManager.log.info("Large message: " +
msg.getMessageID() +
" didn't have any associated
reference, file will be deleted");
msg.decrementDelayDeletionCount();
}
@@ -1598,6 +1614,45 @@
return info;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ if (pageMaxConcurrentIO != null)
+ {
+ pageMaxConcurrentIO.acquire();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ if (pageMaxConcurrentIO != null)
+ {
+ pageMaxConcurrentIO.release();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(long)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return journalFF.allocateDirectBuffer(size);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ journalFF.releaseBuffer(buffer);
+ }
+
// Public
-----------------------------------------------------------------------------------
public Journal getMessageJournal()
@@ -1695,23 +1750,24 @@
if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
{
+ // for compatibility: couple with old behaviour, copying the old file to avoid
message loss
long originalMessageID =
largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
- LargeServerMessage originalMessage =
(LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
+
+ SequentialFile currentFile =
createFileForLargeMessage(largeMessage.getMessageID(), true);
+
+ if (!currentFile.exists())
{
- // this could happen if the message was deleted but the file still exists as
the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setDurable(true);
- originalMessage.setMessageID(originalMessageID);
- messages.put(originalMessageID, originalMessage);
+ SequentialFile linkedFile = createFileForLargeMessage(originalMessageID,
true);
+ if (linkedFile.exists())
+ {
+ linkedFile.copyTo(currentFile);
+ linkedFile.close();
+ }
}
+
+ currentFile.close();
+ }
- originalMessage.incrementDelayDeletionCount();
-
- largeMessage.setLinkedMessage(originalMessage);
- }
return largeMessage;
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -48,8 +48,6 @@
// Attributes ----------------------------------------------------
private final JournalStorageManager storageManager;
-
- private LargeServerMessage linkMessage;
private long pendingRecordID = -1;
@@ -80,7 +78,6 @@
private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties
properties, final SequentialFile fileCopy, final long newID)
{
super(copy, properties);
- linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
bodySize = copy.bodySize;
@@ -191,7 +188,7 @@
checkDelete();
}
}
-
+
@Override
public BodyEncoder getBodyEncoder() throws HornetQException
{
@@ -203,27 +200,19 @@
{
if (getRefCount() <= 0)
{
- if (linkMessage != null)
+ if (LargeServerMessageImpl.isTrace)
{
- // This file is linked to another message, deleting the reference where it
belongs on this case
- linkMessage.decrementDelayDeletionCount();
+ LargeServerMessageImpl.log.trace("Deleting file " + file + "
as the usage was complete");
}
- else
- {
- if (LargeServerMessageImpl.isTrace)
- {
- LargeServerMessageImpl.log.trace("Deleting file " + file +
" as the usage was complete");
- }
- try
- {
- deleteFile();
- }
- catch (Exception e)
- {
- LargeServerMessageImpl.log.error(e.getMessage(), e);
- }
+ try
+ {
+ deleteFile();
}
+ catch (Exception e)
+ {
+ LargeServerMessageImpl.log.error(e.getMessage(), e);
+ }
}
}
@@ -319,15 +308,9 @@
{
long idToUse = messageID;
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- :
(LargeServerMessageImpl)linkMessage,
+ ServerMessage newMessage = new LargeServerMessageImpl(this,
properties,
newfile,
messageID);
@@ -338,60 +321,34 @@
@Override
public synchronized ServerMessage copy(final long newID)
{
- if (!paged)
+ try
{
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
- {
- idToUse = linkMessage.getMessageID();
- }
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
-
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
- :
(LargeServerMessageImpl)linkMessage,
- properties,
- newfile,
- newID);
+ validateFile();
+
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties,
newFile, newID);
+
return newMessage;
}
- else
+ catch (Exception e)
{
- try
- {
- validateFile();
-
- SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
durable);
-
- file.copyTo(newFile);
-
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this,
properties, newFile, newID);
-
- newMessage.linkMessage = null;
-
- newMessage.setPaged();
-
- return newMessage;
- }
- catch (Exception e)
- {
- log.warn("Error on copying large message this for DLA or Expiry",
e);
- return null;
- }
- finally
- {
- releaseResources();
- }
+ log.warn("Error on copying large message " + this + " for DLA or
Expiry", e);
+ return null;
}
+ finally
+ {
+ releaseResources();
+ }
}
- public SequentialFile getFile()
+ public SequentialFile getFile() throws Exception
{
+ validateFile();
return file;
}
@@ -463,32 +420,6 @@
}
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(final LargeServerMessage message)
- {
- if (file != null)
- {
- // Sanity check.. it shouldn't happen
- throw new IllegalStateException("LargeMessage file was already set");
- }
-
- linkMessage = message;
-
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
- try
- {
- openFile();
- bodySize = file.size();
- closeFile();
- }
- catch (Exception e)
- {
- throw new RuntimeException("could not setup linked file", e);
- }
- }
-
// Inner classes -------------------------------------------------
class DecodingContext implements BodyEncoder
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -89,21 +89,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#getLinkedMessage()
- */
- public LargeServerMessage getLinkedMessage()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
- public void setLinkedMessage(final LargeServerMessage message)
- {
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#isComplete()
*/
public boolean isComplete()
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence.impl.nullpm;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -591,4 +592,34 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#beforePageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#afterPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ // We can just have hope on GC here :-)
+ }
+
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -1181,6 +1181,8 @@
{
context.getTransaction().markAsRollbackOnly(new
HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));
}
+
+ message.decrementRefCount();
return false;
}
@@ -1196,6 +1198,7 @@
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+
}
byte[] duplicateIDBytes = message.getDuplicateIDBytes();
@@ -1219,6 +1222,8 @@
{
context.getTransaction().markAsRollbackOnly(new
HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage));
}
+
+ message.decrementRefCount();
return false;
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/LargeServerMessage.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -13,6 +13,7 @@
package org.hornetq.core.server;
+
/**
* A LargeMessage
*
@@ -26,9 +27,6 @@
{
void addBytes(byte[] bytes) throws Exception;
- /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we
specify a link between the messages */
- void setLinkedMessage(LargeServerMessage message);
-
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -572,11 +572,19 @@
{
producer.send(dest, message);
}
- catch (HornetQException e)
+ catch (final HornetQException e)
{
log.warn("Unable to send message " + ref + ", will try again
once bridge reconnects", e);
refs.remove(ref);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ connectionFailed(e, false);
+ }
+ });
return HandleStatus.BUSY;
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
@@ -497,6 +498,14 @@
catch (Exception e)
{
log.warn("Unable to announce backup, retrying", e);
+
+ scheduledExecutor.schedule(new Runnable(){
+ public void run()
+ {
+ announceBackup();
+ }
+
+ }, retryInterval, TimeUnit.MILLISECONDS);
}
}
});
@@ -553,12 +562,20 @@
public void onConnection(ClientSessionFactoryInternal sf)
{
TopologyMember localMember = getLocalMember();
- sf.sendNodeAnnounce(localMember.getUniqueEventID(),
- manager.getNodeId(),
- false,
- localMember.getConnector().getA(),
- localMember.getConnector().getB());
+ if (localMember != null)
+ {
+ sf.sendNodeAnnounce(localMember.getUniqueEventID(),
+ manager.getNodeId(),
+ false,
+ localMember.getConnector().getA(),
+ localMember.getConnector().getB());
+ }
+ else
+ {
+ log.warn("LocalMember is not set at on ClusterConnection " + this);
+ }
+ // TODO: shouldn't we send the current time here? and change the current
topology?
// sf.sendNodeAnnounce(System.currentTimeMillis(),
// manager.getNodeId(),
// false,
@@ -674,6 +691,9 @@
serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
serverLocator.setCallTimeout(callTimeout);
+
+ // No producer flow control on the bridges, as we don't want to lock the
queues
+ serverLocator.setProducerWindowSize(-1);
if (retryInterval > 0)
{
@@ -913,6 +933,9 @@
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
targetLocator.setMinLargeMessageSize(minLargeMessageSize);
+ // No producer flow control on the bridges, as we don't want to lock the
queues
+ targetLocator.setProducerWindowSize(-1);
+
targetLocator.setAfterConnectionInternalListener(this);
targetLocator.setNodeID(nodeId);
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -471,6 +471,14 @@
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
+ //disable flow control
+ serverLocator.setProducerWindowSize(-1);
+
+ // This will be set to 30s unless it's changed from embedded / testing
+ // there is no reason to exception the config for this timeout
+ // since the Bridge is supposed to be non-blocking and fast
+ // We may expose this if we find a good use case
+ serverLocator.setCallTimeout(config.getCallTimeout());
if (!config.isUseDuplicateDetection())
{
log.debug("Bridge " + config.getName() +
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -318,6 +318,12 @@
public synchronized void start() throws Exception
{
+ if (started)
+ {
+ log.debug("Server already started!");
+ return;
+ }
+
log.debug("Starting server " + this);
OperationContextImpl.clearContext();
@@ -487,21 +493,33 @@
}
+ remotingService.stop();
+
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't
get to the client
// It may still be possible to have this scenario on a real failure (without the
use of XA)
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- session.close(true);
- if (!criticalIOError)
+ try
{
- session.waitContextCompletion();
+ storageManager.setContext(session.getSessionContext());
+ session.close(true);
+ if (!criticalIOError)
+ {
+ session.waitContextCompletion();
+ }
}
+ catch (Exception e)
+ {
+ // If anything went wrong with closing sessions.. we should ignore it
+ // such as transactions.. etc.
+ log.warn(e.getMessage(), e);
+ }
}
+
+ storageManager.clearContext();
- remotingService.stop();
-
synchronized (this)
{
// Stop the deployers
@@ -960,11 +978,6 @@
Queue queue = (Queue)binding.getBindable();
- if (queue.getPageSubscription() != null)
- {
- queue.getPageSubscription().close();
- }
-
if (queue.getConsumerCount() != 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete
queue " + queue.getName() +
@@ -987,14 +1000,27 @@
}
}
+ postOffice.removeBinding(queueName);
+
queue.deleteAllReferences();
if (queue.isDurable())
{
storageManager.deleteQueueBinding(queue.getID());
}
+
- postOffice.removeBinding(queueName);
+ if (queue.getPageSubscription() != null)
+ {
+ queue.getPageSubscription().close();
+ }
+
+ PageSubscription subs = queue.getPageSubscription();
+
+ if (subs != null)
+ {
+ subs.cleanupEntries(true);
+ }
}
public synchronized void registerActivateCallback(final ActivateCallback callback)
@@ -1170,6 +1196,15 @@
return "HornetQServerImpl::" + (nodeManager != null ?
"serverUUID=" + nodeManager.getUUID() : "");
}
}
+
+ /**
+ * For tests only, don't use this method as it's not part of the API
+ * @param factory
+ */
+ public void replaceQueueFactory(QueueFactory factory)
+ {
+ this.queueFactory = factory;
+ }
// Package protected
// ----------------------------------------------------------------------------
@@ -1181,26 +1216,6 @@
* Protected so tests can change this behaviour
* @param backupConnector
*/
- // protected FailoverManagerImpl createBackupConnectionFailoverManager(final
TransportConfiguration backupConnector,
- // final ExecutorService threadPool,
- // final ScheduledExecutorService scheduledPool)
- // {
- // return new FailoverManagerImpl((ClientSessionFactory)null,
- // backupConnector,
- // null,
- // false,
- // HornetQClient.DEFAULT_CALL_TIMEOUT,
- // HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- // HornetQClient.DEFAULT_CONNECTION_TTL,
- // 0,
- // 1.0d,
- // 0,
- // 1,
- // false,
- // threadPool,
- // scheduledPool,
- // null);
- // }
protected PagingManager createPagingManager()
{
@@ -1607,7 +1622,7 @@
for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
{
- log.info("Deleting pending large message as it wasn't completed:"
+ msgToDelete);
+ log.info("Deleting pending large message as it wasn't completed:
LargeMessageID:" + msgToDelete.getB());
LargeServerMessage msg = storageManager.createLargeMessage();
msg.setMessageID(msgToDelete.getB());
msg.setPendingRecordID(msgToDelete.getA());
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -303,17 +303,24 @@
this.executor = executor;
- checkQueueSizeFuture = scheduledExecutor.scheduleWithFixedDelay(new Runnable()
+ try
{
- public void run()
+ checkQueueSizeFuture = scheduledExecutor.scheduleWithFixedDelay(new Runnable()
{
- // This flag is periodically set to true. This enables the directDeliver flag
to be set to true if the queue
- // is empty
- // We don't want to evaluate that on every delivery since that's too
expensive
-
- checkDirect = true;
- }
- }, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
+ public void run()
+ {
+ // This flag is periodically set to true. This enables the directDeliver
flag to be set to true if the queue
+ // is empty
+ // We don't want to evaluate that on every delivery since that's
too expensive
+
+ checkDirect = true;
+ }
+ }, CHECK_QUEUE_SIZE_PERIOD, CHECK_QUEUE_SIZE_PERIOD, TimeUnit.MILLISECONDS);
+ }
+ catch (RejectedExecutionException ignored)
+ {
+ // This could happen on a server shutdown
+ }
}
// Bindable implementation
-------------------------------------------------------------------------------------
@@ -2203,7 +2210,8 @@
return status;
}
- private void postAcknowledge(final MessageReference ref)
+ // Protected as testcases may change this behaviour
+ protected void postAcknowledge(final MessageReference ref)
{
QueueImpl queue = (QueueImpl)ref.getQueue();
@@ -2219,6 +2227,15 @@
boolean durableRef = message.isDurable() && queue.durable;
+ try
+ {
+ message.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
+ }
+
if (durableRef)
{
int count = message.decrementDurableRefCount();
@@ -2250,15 +2267,6 @@
}
}
}
-
- try
- {
- message.decrementRefCount();
- }
- catch (Exception e)
- {
- QueueImpl.log.warn("Unable to decrement reference counting", e);
- }
}
void postRollback(final LinkedList<MessageReference> refs)
@@ -2328,6 +2336,10 @@
for (MessageReference ref : refsToAck)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace("rolling back " + ref);
+ }
try
{
if (ref.getQueue().checkRedelivery(ref, timeBase))
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -586,7 +587,7 @@
return messageQueue;
}
- public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long
messageID) throws Exception
+ public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long
messageID) throws Exception
{
if (browseOnly)
{
@@ -595,34 +596,78 @@
// Acknowledge acknowledges all refs delivered by the consumer up to and including
the one explicitly
// acknowledged
-
- MessageReference ref;
- do
+
+ // We use a transaction here as if the message is not found, we should rollback
anything done
+ // This could eventually happen on retries during transactions, and we need to make
sure we don't ACK things we are not supposed to acknowledge
+
+ boolean startedTransaction = false;
+
+ if (tx == null || autoCommitAcks)
{
- ref = deliveringRefs.poll();
-
- if (ref == null)
+ startedTransaction = true;
+ tx = new TransactionImpl(storageManager);
+ }
+
+ try
+ {
+
+ MessageReference ref;
+ do
{
- throw new IllegalStateException(System.identityHashCode(this) + " Could
not find reference on consumerID=" +
- id +
- ", messageId = " +
- messageID +
- " queue = " +
- messageQueue.getName() +
- " closed = " +
- closed);
+ ref = deliveringRefs.poll();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("ACKing ref " + ref + " on " + this);
+ }
+
+ if (ref == null)
+ {
+
+ HornetQException e = new HornetQException(HornetQException.ILLEGAL_STATE,
"Could not find reference on consumerID=" +
+ id +
+ ", messageId = " +
+ messageID +
+ " queue = " +
+ messageQueue.getName());
+ throw e;
+ }
+
+ ref.getQueue().acknowledge(tx, ref);
}
-
- if (autoCommitAcks || tx == null)
+ while (ref.getMessage().getMessageID() != messageID);
+
+ if (startedTransaction)
{
- ref.getQueue().acknowledge(ref);
+ tx.commit();
}
+ }
+ catch (HornetQException e)
+ {
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
else
{
- ref.getQueue().acknowledge(tx, ref);
+ tx.markAsRollbackOnly(e);
}
+ throw e;
}
- while (ref.getMessage().getMessageID() != messageID);
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ HornetQException hqex = new HornetQException(HornetQException.ILLEGAL_STATE,
e.getMessage());
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
+ else
+ {
+ tx.markAsRollbackOnly(hqex);
+ }
+ throw hqex;
+ }
}
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx,
final long messageID) throws Exception
@@ -885,7 +930,7 @@
int localChunkLen = 0;
localChunkLen = (int)Math.min(sizePendingLargeMessage -
positionPendingLargeMessage, minLargeMessageSize);
-
+
HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
context.encode(bodyBuffer, localChunkLen);
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -289,7 +289,14 @@
{
// We only rollback local txs on close, not XA tx branches
- rollback(failed);
+ try
+ {
+ rollback(false);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
@@ -579,6 +586,11 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
+
+ if (consumer == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Consumer "
+ consumerID + " wasn't created on the server");
+ }
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
@@ -605,7 +617,7 @@
}
}
- public void commit() throws Exception
+ public synchronized void commit() throws Exception
{
if (isTrace)
{
@@ -621,7 +633,7 @@
}
}
- public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
+ public synchronized void rollback(final boolean considerLastMessageAsDelivered) throws
Exception
{
if (tx == null)
{
@@ -642,7 +654,7 @@
}
}
- public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
+ public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws
Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -690,7 +702,7 @@
}
}
- public void xaEnd(final Xid xid) throws Exception
+ public synchronized void xaEnd(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -735,7 +747,7 @@
}
}
- public void xaForget(final Xid xid) throws Exception
+ public synchronized void xaForget(final Xid xid) throws Exception
{
long id = resourceManager.removeHeuristicCompletion(xid);
@@ -758,7 +770,7 @@
}
}
- public void xaJoin(final Xid xid) throws Exception
+ public synchronized void xaJoin(final Xid xid) throws Exception
{
Transaction theTx = resourceManager.getTransaction(xid);
@@ -781,7 +793,7 @@
}
}
- public void xaResume(final Xid xid) throws Exception
+ public synchronized void xaResume(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -816,7 +828,7 @@
}
}
- public void xaRollback(final Xid xid) throws Exception
+ public synchronized void xaRollback(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -865,7 +877,7 @@
}
}
- public void xaStart(final Xid xid) throws Exception
+ public synchronized void xaStart(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -888,7 +900,7 @@
}
}
- public void xaSuspend() throws Exception
+ public synchronized void xaSuspend() throws Exception
{
if (tx == null)
{
@@ -913,7 +925,7 @@
}
}
- public void xaPrepare(final Xid xid) throws Exception
+ public synchronized void xaPrepare(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -1062,7 +1074,7 @@
if (consumer == null)
{
- ServerSessionImpl.log.error("There is no consumer with id " +
consumerID);
+ ServerSessionImpl.log.debug("There is no consumer with id " +
consumerID);
return;
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -1,6 +1,7 @@
package org.hornetq.integration.spring;
import org.hornetq.spi.core.naming.BindingRegistry;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
/**
@@ -18,7 +19,14 @@
public Object lookup(String name)
{
- return factory.getBean(name);
+ try
+ {
+ return factory.getBean(name);
+ }
+ catch (NoSuchBeanDefinitionException e)
+ {
+ return null;
+ }
}
public boolean bind(String name, Object obj)
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -173,6 +173,11 @@
{
jmsServerManager.addQueueToJndi(managedQueue.getName(), jndi);
}
+
+ public void removeJNDI(String jndi) throws Exception
+ {
+ jmsServerManager.removeQueueFromJNDI(managedQueue.getName(), jndi);
+ }
public String[] getJNDIBindings()
{
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -91,7 +91,17 @@
{
jmsServerManager.addTopicToJndi(managedTopic.getName(), jndi);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.jms.management.TopicControl#removeJNDI(java.lang.String)
+ */
+ public void removeJNDI(String jndi) throws Exception
+ {
+ jmsServerManager.removeTopicFromJNDI(managedTopic.getName(), jndi);
+ }
+
+
public String[] getJNDIBindings()
{
return jmsServerManager.getJNDIOnTopic(managedTopic.getName());
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -419,6 +419,8 @@
public void runException() throws Exception
{
+ checkJNDI(jndi);
+
if (internalCreateQueue(queueName, selectorString, durable))
{
@@ -473,6 +475,8 @@
public void runException() throws Exception
{
+ checkJNDI(jndi);
+
if (internalCreateTopic(topicName))
{
HornetQDestination destination = topics.get(topicName);
@@ -512,6 +516,8 @@
public boolean addTopicToJndi(final String topicName, final String jndiBinding) throws
Exception
{
checkInitialised();
+
+ checkJNDI(jndiBinding);
HornetQTopic destination = topics.get(topicName);
if (destination == null)
@@ -551,6 +557,8 @@
{
checkInitialised();
+ checkJNDI(jndiBinding);
+
HornetQQueue destination = queues.get(queueName);
if (destination == null)
{
@@ -572,6 +580,8 @@
public boolean addConnectionFactoryToJNDI(final String name, final String jndiBinding)
throws Exception
{
checkInitialised();
+
+ checkJNDI(jndiBinding);
HornetQConnectionFactory factory = connectionFactories.get(name);
if (factory == null)
@@ -965,7 +975,7 @@
{
runAfterActive(new RunnableException()
{
-
+
public String toString()
{
return "createConnectionFactory for " + cfConfig.getName();
@@ -973,6 +983,7 @@
public void runException() throws Exception
{
+ checkJNDI(jndi);
HornetQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig);
@@ -1413,6 +1424,18 @@
list.add(jndiItem);
}
}
+
+ private void checkJNDI(final String ... jndiNames) throws NamingException
+ {
+
+ for (String jndiName : jndiNames)
+ {
+ if (registry.lookup(jndiName) != null)
+ {
+ throw new NamingException(jndiName + " already has an object
bound");
+ }
+ }
+ }
private boolean bindToJndi(final String jndiName, final Object objectToBind) throws
NamingException
{
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -85,7 +85,7 @@
HornetQRAConnectionMetaData.log.trace("getJMSMinorVersion()");
}
- return 1;
+ return 2;
}
/**
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAProperties.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAProperties.java 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAProperties.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -143,7 +143,7 @@
/**
* @param value the useJNDI to set
*/
- public void setUseJNDI(final boolean value)
+ public void setUseJNDI(final Boolean value)
{
useJNDI = value;
}
@@ -216,7 +216,7 @@
return setupAttempts;
}
- public void setSetupAttempts(int setupAttempts)
+ public void setSetupAttempts(Integer setupAttempts)
{
this.setupAttempts = setupAttempts;
}
@@ -226,7 +226,7 @@
return setupInterval;
}
- public void setSetupInterval(long setupInterval)
+ public void setSetupInterval(Long setupInterval)
{
this.setupInterval = setupInterval;
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -1100,7 +1100,7 @@
/**
* @param value the useJNDI to set
*/
- public void setUseJNDI(final boolean value)
+ public void setUseJNDI(final Boolean value)
{
raProperties.setUseJNDI(value);
}
@@ -1193,7 +1193,7 @@
return raProperties.getSetupAttempts();
}
- public void setSetupAttempts(int setupAttempts)
+ public void setSetupAttempts(Integer setupAttempts)
{
if (HornetQResourceAdapter.trace)
{
@@ -1211,7 +1211,7 @@
return raProperties.getSetupInterval();
}
- public void setSetupInterval(long interval)
+ public void setSetupInterval(Long interval)
{
if (HornetQResourceAdapter.trace)
{
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -370,6 +370,14 @@
spec.isUseLocalTx(),
spec.getTransactionTimeout());
+ result.addMetaData("resource-adapter", "inbound");
+ result.addMetaData("jms-session", "");
+ String clientID = ra.getClientID() == null?spec.getClientID():ra.getClientID();
+ if (clientID != null)
+ {
+ result.addMetaData("jms-client-id", clientID);
+ }
+
HornetQActivation.log.debug("Using queue connection " + result);
return result;
Modified: branches/Branch_2_2_AS7/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/Branch_2_2_AS7/tests/config/ConfigurationTest-full-config.xml 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/tests/config/ConfigurationTest-full-config.xml 2012-01-02
13:49:15 UTC (rev 11943)
@@ -36,6 +36,7 @@
<create-bindings-dir>false</create-bindings-dir>
<journal-directory>somedir2</journal-directory>
<create-journal-dir>false</create-journal-dir>
+ <page-max-concurrent-io>17</page-max-concurrent-io>
<journal-type>NIO</journal-type>
<journal-compact-min-files>123</journal-compact-min-files>
<journal-compact-percentage>33</journal-compact-percentage>
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/InterceptorTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -127,16 +128,38 @@
{
public boolean intercept(final Packet packet, final RemotingConnection connection)
throws HornetQException
{
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
return false;
}
-
+
return true;
}
}
+ /**
+ * @param packet
+ */
+ private boolean isForceDeliveryResponse(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+ {
+ SessionReceiveMessage msg = (SessionReceiveMessage) packet;
+ if
(msg.getMessage().containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private class MyInterceptor5 implements Interceptor
{
private final String key;
@@ -224,6 +247,12 @@
public boolean intercept(final Packet packet, final RemotingConnection connection)
throws HornetQException
{
+
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
SessionReceiveMessage p = (SessionReceiveMessage)packet;
@@ -262,6 +291,8 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
+
+ message.putIntProperty("count", i);
message.putStringProperty(InterceptorTest.key, "apple");
@@ -275,7 +306,11 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
-
+
+ assertNotNull(message);
+
+ assertEquals(i, message.getIntProperty("count").intValue());
+
Assert.assertEquals("orange",
message.getStringProperty(InterceptorTest.key));
}
@@ -413,7 +448,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
-
+
producer.send(message);
}
@@ -422,7 +457,7 @@
session.start();
ClientMessage message = consumer.receive(100);
-
+
Assert.assertNull(message);
session.close();
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -19,7 +19,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -82,7 +89,109 @@
}
}
}
+
+
+ /**
+ * This is validating a case where a consumer will try to ack a message right after
failover, but the consumer at the target server didn't
+ * receive the message yet.
+ * on that case the system should rollback any acks done and redeliver any messages
+ */
+ public void testInvalidACK() throws Exception
+ {
+ HornetQServer server = createServer(false);
+ try
+ {
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setAckBatchSize(0);
+
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory cf = locator.createSessionFactory();
+
+
+ int numMessages = 100;
+
+ ClientSession sessionConsumer = cf.createSession(true, true, 0);
+
+ sessionConsumer.start();
+
+ sessionConsumer.createQueue(addressA, queueA, true);
+
+ ClientConsumer consumer = sessionConsumer.createConsumer(queueA);
+ // sending message
+ {
+ ClientSession sendSession = cf.createSession(false, true, true);
+
+ ClientProducer cp = sendSession.createProducer(addressA);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = sendSession.createMessage(true);
+ msg.putIntProperty("seq", i);
+ cp.send(msg);
+ }
+
+ sendSession.close();
+ }
+
+ {
+
+ ClientMessage msg = consumer.receive(5000);
+
+ // need to way some time before all the possible references are sent to the
consumer
+ // as we need to guarantee the order on cancellation on this test
+ Thread.sleep(1000);
+
+ try
+ {
+ // pretending to be an unbehaved client doing an invalid ack right after
failover
+ ((ClientSessionInternal)sessionConsumer).acknowledge(0, 12343);
+ fail("supposed to throw an exception here");
+ }
+ catch (Exception e)
+ {
+ }
+
+ try
+ {
+ // pretending to be an unbehaved client doing an invalid ack right after
failover
+ ((ClientSessionInternal)sessionConsumer).acknowledge(3, 12343);
+ fail("supposed to throw an exception here");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ consumer.close();
+
+ consumer = sessionConsumer.createConsumer(queueA);
+
+
+ for (int i = 0 ; i < numMessages; i++)
+ {
+ msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("seq").intValue());
+ msg.acknowledge();
+ }
+ }
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void testAsyncConsumerNoAck() throws Exception
{
HornetQServer server = createServer(false);
Added:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
(rev 0)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.File;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * This test will send large messages in page-mode, DLQ then, expiry then, and they
should be received fine
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ExpiryLargeMessageTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ final SimpleString EXPIRY = new SimpleString("my-expiry");
+
+ final SimpleString DLQ = new SimpleString("my-DLQ");
+
+ final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+ final int messageSize = 10 * 1024;
+
+ // it has to be an even number
+ final int numberOfMessages = 50;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testExpiryMessagesThenDLQ() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(50 * 1024);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Send a few regular messages first, then all is just large messages
+ if (i % 2 == 0)
+ {
+ message.putBooleanProperty("tst-large", false);
+ message.getBodyBuffer().writeBytes(bufferSample);
+ }
+ else
+ {
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+ }
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ Thread.sleep(1500);
+
+ // just to try expiring
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ // Consume half of the messages to make sure all the messages are paging (on the
second try)
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ cons.close();
+
+ for (int rep = 0; rep < 6; rep++)
+ {
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ System.out.println("Trying " + rep);
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ if (rep == 0)
+ {
+ // restart the server at the first try
+ server.stop();
+ server.start();
+ }
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ session.start();
+ }
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ session.close();
+ sf.close();
+
+ for (int rep = 0; rep < 2; rep++)
+ {
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(DLQ);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+ if (rep == 0)
+ {
+ session.rollback();
+ session.close();
+ sf.close();
+ server.stop();
+ server.start();
+ }
+ }
+
+ session.commit();
+
+ assertNull(cons.receiveImmediate());
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ /**
+ * Tests if the system would still couple with old data where the LargeMessage was
linked to its previous copy
+ * @throws Exception
+ */
+ public void testCompatilityWithLinks() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setExpiryAddress(EXPIRY);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ setting.setMaxDeliveryAttempts(5);
+ setting.setMaxSizeBytes(-1);
+ setting.setPageSizeBytes(10 * 1024);
+ setting.setDeadLetterAddress(DLQ);
+ server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+ server.start();
+
+ try
+ {
+
+ server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+ server.createQueue(DLQ, DLQ, null, true, false);
+
+ server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ byte bufferSample[] = new byte[messageSize];
+
+ for (int i = 0; i < bufferSample.length; i++)
+ {
+ bufferSample[i] = getSamplebyte(i);
+ }
+
+ ClientProducer producer = session.createProducer(MY_QUEUE);
+
+ long timeToExpiry = System.currentTimeMillis() + 1000;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ // Everything is going to be a large message
+ message.putBooleanProperty("tst-large", true);
+ message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+ message.setExpiration(timeToExpiry);
+
+ producer.send(message);
+ }
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ Thread.sleep(1500);
+
+ ClientConsumer cons = session.createConsumer(MY_QUEUE);
+ assertNull(cons.receive(1000));
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ cons = session.createConsumer(EXPIRY);
+ session.start();
+
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+
+ server.stop();
+
+ // rename the file, simulating old behaviour
+ long messageID = msg.getMessageID();
+ long oldID = msg.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+ File oldFile = new File(largeMessagesFileDir, oldID + ".msg");
+ File currentFile = new File(largeMessagesFileDir, messageID +
".msg");
+ currentFile.renameTo(oldFile);
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+ session.start();
+
+ cons = session.createConsumer(EXPIRY);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = cons.receive(5000);
+ assertNotNull(message);
+
+ if (i % 10 == 0)
+ {
+ System.out.println("Received " + i);
+ }
+
+ for (int location = 0; location < messageSize; location++)
+ {
+ assertEquals(getSamplebyte((long)location),
message.getBodyBuffer().readByte());
+ }
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -13,7 +13,10 @@
package org.hornetq.tests.integration.client;
+import java.io.IOException;
import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -29,14 +32,25 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.utils.ExecutorFactory;
/**
* A LargeMessageTest
@@ -128,6 +142,9 @@
}
server.stop(false);
+
+ forceGC();
+
server.start();
server.stop();
@@ -251,8 +268,12 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
-
+ server = createServer(true,
+ createDefaultConfig(isNetty()),
+ 10000,
+ 20000,
+ new HashMap<String, AddressSettings>());
+
// server.getConfiguration()
// .getInterceptorClassNames()
// .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -267,7 +288,7 @@
session = sf.createSession(false, true, true);
session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
+
server.getPagingManager().getPageStore(ADDRESS).startPaging();
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
@@ -279,19 +300,19 @@
producer.send(clientFile);
}
session.commit();
-
+
validateNoFilesOnLargeDir(10);
for (int h = 0; h < 5; h++)
{
session.close();
-
+
sf.close();
-
+
server.stop();
-
+
server.start();
-
+
sf = locator.createSessionFactory();
session = sf.createSession(false, false);
@@ -318,11 +339,11 @@
{
session.rollback();
}
-
+
session.close();
sf.close();
}
-
+
server.stop(false);
server.start();
@@ -360,8 +381,12 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
-
+ server = createServer(true,
+ createDefaultConfig(isNetty()),
+ 10000,
+ 20000,
+ new HashMap<String, AddressSettings>());
+
// server.getConfiguration()
// .getInterceptorClassNames()
// .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
@@ -374,7 +399,7 @@
ClientSessionFactory sf = locator.createSessionFactory();
session = sf.createSession(true, false, false);
-
+
Xid xid1 = newXID();
Xid xid2 = newXID();
@@ -391,13 +416,11 @@
producer.send(clientFile);
}
session.end(xid1, XAResource.TMSUCCESS);
-
+
session.prepare(xid1);
-
session.start(xid2, XAResource.TMNOFLAGS);
-
for (int i = 0; i < 10; i++)
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
@@ -406,32 +429,32 @@
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
-
+
session.prepare(xid2);
-
+
session.close();
sf.close();
-
+
server.stop(false);
server.start();
-
- for (int start = 0 ; start < 2; start++)
+
+ for (int start = 0; start < 2; start++)
{
System.out.println("Start " + start);
-
+
sf = locator.createSessionFactory();
-
+
if (start == 0)
{
session = sf.createSession(true, false, false);
session.commit(xid1, false);
session.close();
}
-
+
session = sf.createSession(false, false, false);
ClientConsumer cons1 = session.createConsumer(ADDRESS);
session.start();
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
@@ -439,7 +462,7 @@
assertEquals(1, msg.getIntProperty("txid").intValue());
msg.acknowledge();
}
-
+
if (start == 1)
{
session.commit();
@@ -448,26 +471,26 @@
{
session.rollback();
}
-
+
session.close();
sf.close();
-
+
server.stop();
server.start();
}
server.stop();
-
+
validateNoFilesOnLargeDir(10);
-
+
server.start();
sf = locator.createSessionFactory();
-
+
session = sf.createSession(true, false, false);
session.rollback(xid2);
-
+
sf.close();
-
+
server.stop();
server.start();
server.stop();
@@ -494,6 +517,296 @@
}
}
+ public void testRestartBeforeDelete() throws Exception
+ {
+
+ class NoPostACKQueue extends QueueImpl
+ {
+
+ public NoPostACKQueue(long id,
+ SimpleString address,
+ SimpleString name,
+ Filter filter,
+ PageSubscription pageSubscription,
+ boolean durable,
+ boolean temporary,
+ ScheduledExecutorService scheduledExecutor,
+ PostOffice postOffice,
+ StorageManager storageManager,
+ HierarchicalRepository<AddressSettings>
addressSettingsRepository,
+ Executor executor)
+ {
+ super(id,
+ address,
+ name,
+ filter,
+ pageSubscription,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ executor);
+ }
+
+ protected void postAcknowledge(final MessageReference ref)
+ {
+ System.out.println("Ignoring postACK on message " + ref);
+ }
+ }
+
+ class NoPostACKQueueFactory implements QueueFactory
+ {
+
+ final StorageManager storageManager;
+
+ final PostOffice postOffice;
+
+ final ScheduledExecutorService scheduledExecutor;
+
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+ final ExecutorFactory execFactory;
+
+ public NoPostACKQueueFactory(StorageManager storageManager,
+ PostOffice postOffice,
+ ScheduledExecutorService scheduledExecutor,
+ HierarchicalRepository<AddressSettings>
addressSettingsRepository,
+ final ExecutorFactory execFactory)
+ {
+ this.storageManager = storageManager;
+ this.postOffice = postOffice;
+ this.scheduledExecutor = scheduledExecutor;
+ this.addressSettingsRepository = addressSettingsRepository;
+ this.execFactory = execFactory;
+ }
+
+ public Queue createQueue(long persistenceID,
+ SimpleString address,
+ SimpleString name,
+ Filter filter,
+ PageSubscription pageSubscription,
+ boolean durable,
+ boolean temporary)
+ {
+
+ return new NoPostACKQueue(persistenceID,
+ address,
+ name,
+ filter,
+ pageSubscription,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository,
+ execFactory.getExecutor());
+// return new QueueImpl(persistenceID,
+// address,
+// name,
+// filter,
+// pageSubscription,
+// durable,
+// temporary,
+// scheduledExecutor,
+// postOffice,
+// storageManager,
+// addressSettingsRepository,
+// execFactory.getExecutor());
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.QueueFactory#setPostOffice(org.hornetq.core.postoffice.PostOffice)
+ */
+ public void setPostOffice(PostOffice postOffice)
+ {
+ }
+
+ }
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+ server.start();
+
+ QueueFactory original = server.getQueueFactory();
+
+ ((HornetQServerImpl)server).replaceQueueFactory(new
NoPostACKQueueFactory(server.getStorageManager(),
+
server.getPostOffice(),
+
server.getScheduledPool(),
+
server.getAddressSettingsRepository(),
+
server.getExecutorFactory()));
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.saveToOutputStream(new java.io.OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+ });
+ msg.acknowledge();
+ session.commit();
+ }
+
+ ((HornetQServerImpl)server).replaceQueueFactory(original);
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testConsumeAfterRestart() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+ server.start();
+
+ QueueFactory original = server.getQueueFactory();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+ sf.close();
+
+ server.stop();
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.saveToOutputStream(new java.io.OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+ });
+ msg.acknowledge();
+ session.commit();
+ }
+
+ ((HornetQServerImpl)server).replaceQueueFactory(original);
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public static class LargeMessageTestInterceptorIgnoreLastPacket implements
Interceptor
{
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -149,6 +149,13 @@
producer.send(bytesMessage);
printPageStoreInfo(pagingStore);
+
+ timeout = System.currentTimeMillis() + 10000;
+
+ while (timeout > System.currentTimeMillis() &&
pagingStore.getNumberOfPages() != 1)
+ {
+ Thread.sleep(100);
+ }
assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page
is 1, but It was not.
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -30,6 +30,7 @@
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
import org.hornetq.core.server.HornetQServer;
@@ -73,7 +74,7 @@
{
return false;
}
-
+
/**
*
*/
@@ -92,34 +93,32 @@
public void testRollbackPartiallyConsumedBuffer() throws Exception
{
- for (int i = 0 ; i < 1; i++)
+ for (int i = 0; i < 1; i++)
{
log.info("#test " + i);
internalTestRollbackPartiallyConsumedBuffer(false);
tearDown();
setUp();
-
+
}
-
+
}
-
+
public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
{
internalTestRollbackPartiallyConsumedBuffer(true);
}
-
-
+
private void internalTestRollbackPartiallyConsumedBuffer(final boolean
redeliveryDelay) throws Exception
{
final int messageSize = 100 * 1024;
-
final ClientSession session;
try
{
server = createServer(true, isNetty());
-
+
AddressSettings settings = new AddressSettings();
if (redeliveryDelay)
{
@@ -130,7 +129,7 @@
}
}
settings.setMaxDeliveryAttempts(-1);
-
+
server.getAddressSettingsRepository().addMatch("#", settings);
server.start();
@@ -143,35 +142,36 @@
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- for (int i = 0 ; i < 20; i++)
+ for (int i = 0; i < 20; i++)
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
-
+
clientFile.putIntProperty("value", i);
-
+
producer.send(clientFile);
}
session.commit();
session.start();
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
final AtomicInteger errors = new AtomicInteger(0);
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
+
consumer.setMessageHandler(new MessageHandler()
{
int counter = 0;
+
public void onMessage(ClientMessage message)
{
message.getBodyBuffer().readByte();
System.out.println("message:" + message);
try
{
- if (counter ++ < 20)
+ if (counter++ < 20)
{
Thread.sleep(100);
System.out.println("Rollback");
@@ -183,7 +183,7 @@
message.acknowledge();
session.commit();
}
-
+
if (counter == 40)
{
latch.countDown();
@@ -197,7 +197,7 @@
}
}
});
-
+
assertTrue(latch.await(40, TimeUnit.SECONDS));
consumer.close();
@@ -974,7 +974,97 @@
}
}
}
+
+ public void testSentWithDuplicateIDBridge() throws Exception
+ {
+ internalTestSentWithDuplicateID(true);
+ }
+ public void testSentWithDuplicateID() throws Exception
+ {
+ internalTestSentWithDuplicateID(false);
+ }
+
+ private void internalTestSentWithDuplicateID(final boolean isSimulateBridge) throws
Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ String someDuplicateInfo = "Anything";
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ if (isSimulateBridge)
+ {
+ clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID,
someDuplicateInfo.getBytes());
+ }
+ else
+ {
+ clientFile.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID,
someDuplicateInfo.getBytes());
+ }
+
+ producer.send(clientFile);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ ClientMessage msg = consumer.receive(10000);
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
+ }
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ assertNull(consumer.receiveImmediate());
+
+ session.commit();
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testResendSmallStreamMessage() throws Exception
{
internalTestResendMessage(50000);
@@ -1729,7 +1819,6 @@
100);
}
-
public void testPageOnLargeMessage() throws Exception
{
testPageOnLargeMessage(true, false);
@@ -2600,7 +2689,7 @@
}
}
}
-
+
// JBPAPP-6237
public void testPageOnLargeMessageMultipleQueues() throws Exception
{
@@ -2754,7 +2843,6 @@
}
-
// JBPAPP-6237
public void testPageOnLargeMessageMultipleQueues2() throws Exception
{
@@ -2796,7 +2884,7 @@
for (int i = 0; i < 100; i++)
{
ClientMessage message = session.createMessage(true);
-
+
message.putIntProperty("msgID", msgId++);
message.putBooleanProperty("TestLarge", false);
@@ -2813,7 +2901,6 @@
producer.send(message);
}
-
for (int i = 0; i < 10; i++)
{
ClientMessage clientFile = createLargeClientMessage(session,
numberOfBytesBigMessage);
@@ -2830,34 +2917,34 @@
ClientConsumer consumer =
session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
session.start();
-
- for (int received = 0 ; received < 5; received++)
+
+ for (int received = 0; received < 5; received++)
{
for (int i = 0; i < 100; i++)
{
ClientMessage message2 =
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
-
+
Assert.assertNotNull(message2);
-
+
assertFalse(message2.getBooleanProperty("TestLarge"));
-
+
message2.acknowledge();
-
+
Assert.assertNotNull(message2);
}
-
+
for (int i = 0; i < 10; i++)
{
ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
-
+
Assert.assertNotNull(messageLarge);
-
+
assertTrue(messageLarge.getBooleanProperty("TestLarge"));
-
+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
-
+
messageLarge.acknowledge();
-
+
messageLarge.saveToOutputStream(bout);
byte[] body = bout.toByteArray();
assertEquals(numberOfBytesBigMessage, body.length);
@@ -2866,7 +2953,7 @@
assertEquals(getSamplebyte(bi), body[bi]);
}
}
-
+
session.rollback();
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -121,7 +121,7 @@
protected void tearDown() throws Exception
{
locator.close();
-
+
locator = null;
super.tearDown();
@@ -360,6 +360,107 @@
}
+ public void testSendOverBlockingNoFlowControl() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ AddressFullMessagePolicy.BLOCK,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 10 * 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setProducerWindowSize(-1);
+ locator.setMinLargeMessageSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = cons.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testReceiveImmediate() throws Exception
{
clearData();
@@ -4181,6 +4282,146 @@
}
}
+ public void testTwoQueuesConsumeOneRestart() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=1"), null, true);
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=2"), null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ // ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+ //
+ // for (int i = 0; i < numberOfMessages; i++)
+ // {
+ // message = consumer.receive(500000);
+ // assertNotNull(message);
+ // message.acknowledge();
+ //
+ // // assertEquals(msg,
message.getIntProperty("propTest").intValue());
+ //
+ // System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ // }
+ //
+ // session.commit();
+
+ // consumer.close();
+
+ session.deleteQueue(PagingTest.ADDRESS.concat("=1"));
+ // server.stop();
+ // server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ // assertEquals(msg,
message.getIntProperty("propTest").intValue());
+
+ System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ server.stop();
+
+ server.start();
+
+ server.stop();
+ server.start();
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -4397,7 +4638,7 @@
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
pgStoreAddress.getCursorProvier().cleanup();
@@ -4536,7 +4777,7 @@
for (int i = 0; i < 500; i++)
{
log.info("Received message " + i);
- message = cons.receive(5000);
+ message = cons.receive(10000);
assertNotNull(message);
message.acknowledge();
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -233,16 +233,15 @@
consumer = session.createConsumer(ADDRESS);
msg = consumer.receive(1000);
Assert.assertNotNull(msg);
- Assert.assertEquals(2, msg.getDeliveryCount());
+ Assert.assertEquals(strictUpdate ? 1 : 2, msg.getDeliveryCount());
session.close();
}
-
public void testInfiniteDedeliveryMessageOnPersistent() throws Exception
{
internaltestInfiniteDedeliveryMessageOnPersistent(false);
}
-
+
private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean strict)
throws Exception
{
setUp(strict);
@@ -255,9 +254,8 @@
session.commit();
session.close();
-
int expectedCount = 1;
- for (int i = 0 ; i < 700; i++)
+ for (int i = 0; i < 700; i++)
{
session = factory.createSession(false, false, false);
session.start();
@@ -277,10 +275,10 @@
factory.close();
server.stop();
-
+
setUp(false);
-
- for (int i = 0 ; i < 700; i++)
+
+ for (int i = 0; i < 700; i++)
{
session = factory.createSession(false, false, false);
session.start();
@@ -293,27 +291,26 @@
server.stop();
-
- JournalImpl journal = new
JournalImpl(server.getConfiguration().getJournalFileSize(),
- 2,
+ JournalImpl journal = new
JournalImpl(server.getConfiguration().getJournalFileSize(),
+ 2,
0,
- 0,
- new
NIOSequentialFileFactory(server.getConfiguration().getJournalDirectory()),
+ 0,
+ new
NIOSequentialFileFactory(server.getConfiguration()
+
.getJournalDirectory()),
"hornetq-data",
"hq",
1);
-
-
+
final AtomicInteger updates = new AtomicInteger();
-
+
journal.start();
journal.load(new LoaderCallback()
{
-
+
public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
{
}
-
+
public void updateRecord(RecordInfo info)
{
if (info.userRecordType == JournalStorageManager.UPDATE_DELIVERY_COUNT)
@@ -321,23 +318,22 @@
updates.incrementAndGet();
}
}
-
+
public void deleteRecord(long id)
{
}
-
+
public void addRecord(RecordInfo info)
{
}
-
+
public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
{
}
});
-
+
journal.stop();
-
-
+
assertEquals(7, updates.get());
}
Added:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
(rev 0)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A TestFlowControlOnIgnoreLargeMessageBodyTest
+ *
+ * @author clebertsuconic
+ * @author Pavel Slavice
+ *
+ *
+ */
+public class TestFlowControlOnIgnoreLargeMessageBodyTest extends JMSTestBase
+{
+
+ Logger log = Logger.getLogger(TestFlowControlOnIgnoreLargeMessageBodyTest.class);
+
+ private Topic topic;
+
+ private static int TOTAL_MESSAGES_COUNT = 20000;
+
+ private static int MSG_SIZE = 150 * 1024;
+
+ private final int CONSUMERS_COUNT = 5;
+
+ private static final String ATTR_MSG_COUNTER = "msgIdex";
+
+ protected int receiveTimeout = 10000;
+
+ private volatile boolean error = false;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ jmsServer.createTopic(true, "topicIn", "/topic/topicIn");
+ topic = (Topic)context.lookup("/topic/topicIn");
+ }
+
+ @Override
+ protected boolean usePersistence()
+ {
+ return false;
+ }
+
+ /**
+ * LoadProducer
+ */
+ class LoadProducer extends Thread
+ {
+ private final ConnectionFactory cf;
+
+ private final Topic topic;
+
+ private final int messagesCount;
+
+ private volatile boolean requestForStop = false;
+
+ private volatile boolean stopped = false;
+
+ private volatile int sentMessages = 0;
+
+ LoadProducer(final String name, final Topic topic, final ConnectionFactory cf,
final int messagesCount) throws Exception
+ {
+ super(name);
+ this.cf = cf;
+ this.topic = topic;
+ this.messagesCount = messagesCount;
+ }
+
+ public void sendStopRequest()
+ {
+ stopped = false;
+ requestForStop = true;
+ }
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
+
+ @Override
+ public void run()
+ {
+ stopped = false;
+ Connection connection = null;
+ Session session = null;
+ MessageProducer prod;
+ log.info("Starting producer for " + topic + " - " +
getName());
+ try
+ {
+ connection = cf.createConnection();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ prod = session.createProducer(topic);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 1; i <= messagesCount && !requestForStop; i++)
+ {
+ if (error)
+ {
+ break;
+ }
+ sentMessages++;
+ BytesMessage msg = session.createBytesMessage();
+
msg.setIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER, i);
+ msg.writeBytes(new
byte[TestFlowControlOnIgnoreLargeMessageBodyTest.MSG_SIZE]);
+ prod.send(msg);
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ if (i % 100 == 0)
+ {
+ log.info("Address " + topic + " sent " + i + "
messages");
+ }
+ }
+ System.out.println("Ending producer for " + topic + " - "
+ getName() + " messages " + sentMessages);
+ }
+ catch (Exception e)
+ {
+ error = true;
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ stopped = true;
+ }
+
+ public int getSentMessages()
+ {
+ return sentMessages;
+ }
+ }
+
+ /**
+ * LoadConsumer
+ */
+ class LoadConsumer extends Thread
+ {
+ private final ConnectionFactory cf;
+
+ private final Topic topic;
+
+ private volatile boolean requestForStop = false;
+
+ private volatile boolean stopped = false;
+
+ private volatile int receivedMessages = 0;
+
+ private final int numberOfMessages;
+
+ private int receiveTimeout = 0;
+
+ private final CountDownLatch consumerCreated;
+
+ LoadConsumer(final CountDownLatch consumerCreated,
+ final String name,
+ final Topic topic,
+ final ConnectionFactory cf,
+ final int receiveTimeout,
+ final int numberOfMessages)
+ {
+ super(name);
+ this.cf = cf;
+ this.topic = topic;
+ this.receiveTimeout = receiveTimeout;
+ this.numberOfMessages = numberOfMessages;
+ this.consumerCreated = consumerCreated;
+ }
+
+ public void sendStopRequest()
+ {
+ stopped = false;
+ requestForStop = true;
+ }
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
+
+ @Override
+ public void run()
+ {
+ Connection connection = null;
+ Session session = null;
+ stopped = false;
+ requestForStop = false;
+ System.out.println("Starting consumer for " + topic + " - "
+ getName());
+ try
+ {
+ connection = cf.createConnection();
+
+ connection.setClientID(getName());
+
+ connection.start();
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic,
getName());
+
+ consumerCreated.countDown();
+
+ int counter = 0;
+
+ while (counter < numberOfMessages && !requestForStop &&
!error)
+ {
+ if (counter == 0)
+ {
+ System.out.println("Starting to consume for " + topic +
" - " + getName());
+ }
+ BytesMessage msg = (BytesMessage)subscriber.receive(receiveTimeout);
+ if (msg == null)
+ {
+ System.out.println("Cannot get message in specified timeout:
" + topic + " - " + getName());
+ error = true;
+ }
+ else
+ {
+ counter++;
+ if
(msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) !=
counter)
+ {
+ error = true;
+ }
+ }
+ if (counter % 10 == 0)
+ {
+ session.commit();
+ }
+ if (counter % 100 == 0)
+ {
+ log.info("## " + getName() + " " + topic + "
received " + counter);
+ }
+ receivedMessages = counter;
+ }
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ System.out.println("Exception in consumer " + getName() + " :
" + e.getMessage());
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Cannot close session " +
e.getMessage());
+ }
+ }
+ if (connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Cannot close connection " +
e.getMessage());
+ }
+ }
+ }
+ stopped = true;
+ System.out.println("Stopping consumer for " + topic +
+ " - " +
+ getName() +
+ ", received " +
+ getReceivedMessages());
+ }
+
+ public int getReceivedMessages()
+ {
+ return receivedMessages;
+ }
+ }
+
+ public void testFlowControl()
+ {
+ Context context = null;
+ try
+ {
+ LoadProducer producer = new LoadProducer("producer",
+ topic,
+ cf,
+
TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+
+ LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];
+
+ CountDownLatch latch = new CountDownLatch(CONSUMERS_COUNT);
+
+ for (int i = 0; i < consumers.length; i++)
+ {
+ consumers[i] = new LoadConsumer(latch,
+ "consumer " + i,
+ topic,
+ cf,
+ receiveTimeout,
+
TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+ }
+
+ for (LoadConsumer consumer : consumers)
+ {
+ consumer.start();
+ }
+
+ latch.await();
+
+ producer.start();
+ producer.join();
+ for (LoadConsumer consumer : consumers)
+ {
+ consumer.join();
+ }
+
+ String errorMessage = null;
+ if (producer.getSentMessages() !=
TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+ {
+ errorMessage = "Producer did not send defined count of messages";
+ }
+ else
+ {
+ for (LoadConsumer consumer : consumers)
+ {
+ if (consumer.getReceivedMessages() !=
TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+ {
+ errorMessage = "Consumer did not send defined count of
messages";
+ break;
+ }
+ }
+ }
+
+ if (errorMessage != null)
+ {
+ System.err.println(" ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR
ERROR ");
+ System.err.println(errorMessage);
+ }
+ else
+ {
+ System.out.println(" OK ");
+ }
+
+ assertFalse(error);
+ assertNull(errorMessage);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ finally
+ {
+ if (context != null)
+ {
+ try
+ {
+ context.close();
+ }
+ catch (NamingException ex)
+ {
+ log.warn(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+
+}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -143,7 +143,7 @@
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT,
messageFromClient.getBodyBuffer().readString());
- assertEquals(2, messageFromClient.getDeliveryCount());
+ assertEquals(1, messageFromClient.getDeliveryCount());
session.close();
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +25,8 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -39,6 +42,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -46,6 +52,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -263,6 +270,220 @@
}
+
+ public void testLostMessageSimpleMessage() throws Exception
+ {
+ internalTestMessageLoss(false);
+ }
+
+ public void testLostMessageLargeMessage() throws Exception
+ {
+ internalTestMessageLoss(true);
+ }
+
+ /** This test will ignore messages
+ What will cause the bridge to fail with a timeout
+ The bridge should still recover the failure and reconnect on that case */
+ public void internalTestMessageLoss(final boolean largeMessage) throws Exception
+ {
+ class MyInterceptor implements Interceptor
+ {
+ public boolean ignoreSends = true;
+ public CountDownLatch latch;
+
+ MyInterceptor(int numberOfIgnores)
+ {
+ latch = new CountDownLatch(numberOfIgnores);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet,
org.hornetq.spi.core.protocol.RemotingConnection)
+ */
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ if (ignoreSends && packet instanceof SessionSendMessage ||
+ ignoreSends && packet instanceof SessionSendContinuationMessage
&& !((SessionSendContinuationMessage)packet).isContinues())
+ {
+ System.out.println("Ignored");
+ latch.countDown();
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ }
+
+ MyInterceptor myInterceptor = new MyInterceptor(3);
+
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
+
+ HashMap<String, TransportConfiguration> connectors = new
HashMap<String, TransportConfiguration>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ final int messageSize = 1024;
+
+ final int numMessages = 1;
+
+ ArrayList<String> connectorConfig = new ArrayList<String>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
+ queueName0,
+
forwardAddress,
+ null,
+ null,
+
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ // Choose
confirmation size to make sure acks
+ // are sent
+ numMessages *
messageSize / 2,
+
connectorConfig,
+ false,
+
ConfigurationImpl.DEFAULT_CLUSTER_USER,
+
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ bridgeConfiguration.setCallTimeout(500);
+
+ List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress,
queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress,
queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+
+ server1.getRemotingService().addInterceptor(myInterceptor);
+
+ server0.start();
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new
SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final byte[] bytes = new byte[messageSize];
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+
+ if (largeMessage)
+ {
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 *
1024));
+ }
+
+ message.putIntProperty(propKey, i);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ producer0.send(message);
+ }
+
+ myInterceptor.latch.await();
+ myInterceptor.ignoreSends = false;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(30000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ if (largeMessage)
+ {
+ readMessages(message);
+ }
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+
+ assertEquals(0, loadQueues(server0).size());
+
+ }
+
/**
* @param server1Params
*/
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -360,7 +360,16 @@
assertEquals(1, connectionSet.size());
ClusterConnectionImpl ccon = (ClusterConnectionImpl)
connectionSet.iterator().next();
- Map<String, MessageFlowRecord> records = ccon.getRecords();
+ long timeout = System.currentTimeMillis() + 5000;
+ Map<String, MessageFlowRecord> records = null;
+ while (timeout > System.currentTimeMillis())
+ {
+ records = ccon.getRecords();
+ if (records != null && records.size() == 1)
+ {
+ break;
+ }
+ }
assertNotNull(records);
assertEquals(records.size(), 1);
getServer(1).getClusterManager().getClusterConnections();
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -44,6 +44,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.RandomUtil;
@@ -129,6 +130,361 @@
return sf.createSession(xa, autoCommitSends, autoCommitAcks);
}
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailover() throws Exception
+ {
+ locator.setCallTimeout(5000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setReconnectAttempts(-1);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+ try
+ {
+ System.out.println("sending message: " + i);
+ producer.send(message);
+ if (i < 10)
+ {
+ latch.countDown();
+ }
+ }
+ catch (HornetQException e)
+ {
+ // this is our retry
+ try
+ {
+ producer.send(message);
+ }
+ catch (HornetQException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ latch.await(10, TimeUnit.SECONDS);
+ log.info("crashing session");
+ crash(session);
+ t.join(5000);
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ System.out.println("received message " + i);
+ // assertEquals(i, m.getIntProperty("counter").intValue());
+ }
+ }
+
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailoverConsume() throws Exception
+ {
+ locator.setCallTimeout(5000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ locator.setRetryInterval(500);
+ locator.setAckBatchSize(0);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+ producer.send(message);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch endLatch = new CountDownLatch(1);
+
+ final ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+
+ final Map<Integer, ClientMessage> received = new HashMap<Integer,
ClientMessage>();
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+
+ public void onMessage(ClientMessage message)
+ {
+ Integer counter = message.getIntProperty("counter");
+ received.put(counter, message);
+ try
+ {
+ log.info("acking message = id = " + message.getMessageID() +
+ ", counter = " +
+ message.getIntProperty("counter"));
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ return;
+ }
+ log.info("Acked counter = " + counter);
+ if (counter.equals(10))
+ {
+ latch.countDown();
+ }
+ if (received.size() == 500)
+ {
+ endLatch.countDown();
+ }
+ }
+
+ });
+ latch.await(10, TimeUnit.SECONDS);
+ log.info("crashing session");
+ crash(session);
+ endLatch.await(60, TimeUnit.SECONDS);
+ assertTrue("received only " + received.size(), received.size() == 500);
+
+ session.close();
+ }
+
+ public void testTimeoutOnFailoverConsumeBlocked() throws Exception
+ {
+ locator.setCallTimeout(5000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setConsumerWindowSize(0);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ locator.setRetryInterval(500);
+ locator.setAckBatchSize(0);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+ message.putBooleanProperty("end", i == 499);
+ producer.send(message);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch endLatch = new CountDownLatch(1);
+
+ final ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+
+ final Map<Integer, ClientMessage> received = new HashMap<Integer,
ClientMessage>();
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ ClientMessage message = null;
+ try
+ {
+ while ((message = getMessage()) != null)
+ {
+ Integer counter = message.getIntProperty("counter");
+ received.put(counter, message);
+ try
+ {
+ log.info("acking message = id = " + message.getMessageID()
+
+ ", counter = " +
+ message.getIntProperty("counter"));
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ continue;
+ }
+ log.info("Acked counter = " + counter);
+ if (counter.equals(10))
+ {
+ latch.countDown();
+ }
+ if (received.size() == 500)
+ {
+ endLatch.countDown();
+ }
+
+ if (message.getBooleanProperty("end"))
+ {
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ private ClientMessage getMessage()
+ {
+ while (true)
+ {
+ try
+ {
+ ClientMessage msg = consumer.receive(20000);
+ if (msg == null)
+ {
+ log.info("Returning null message on consuming");
+ }
+ return msg;
+ }
+ catch (Exception ignored)
+ {
+ // retry
+ ignored.printStackTrace();
+ }
+ }
+ }
+ };
+ t.start();
+ latch.await(10, TimeUnit.SECONDS);
+ log.info("crashing session");
+ crash(session);
+ endLatch.await(60, TimeUnit.SECONDS);
+ t.join();
+ assertTrue("received only " + received.size(), received.size() == 500);
+
+ session.close();
+ }
+
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailoverTransactionCommit() throws Exception
+ {
+ locator.setCallTimeout(2000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setReconnectAttempts(-1);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, false, false);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+
+ System.out.println("sending message: " + i);
+ producer.send(message);
+
+ }
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ System.out.println("crashing session");
+ crash(false, session);
+
+ session.commit(xid, false);
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ System.out.println("received message " + i);
+ assertEquals(i, m.getIntProperty("counter").intValue());
+ }
+ }
+
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailoverTransactionRollback() throws Exception
+ {
+ locator.setCallTimeout(2000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setReconnectAttempts(-1);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, false, false);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+
+ System.out.println("sending message: " + i);
+ producer.send(message);
+ }
+
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ System.out.println("crashing session");
+ crash(false, session);
+
+ session.rollback(xid);
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+
+ ClientMessage m = consumer.receive(1000);
+ assertNull(m);
+
+ }
+
//
https://jira.jboss.org/browse/HORNETQ-522
public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
{
@@ -1334,7 +1690,7 @@
session2.end(xid, XAResource.TMSUCCESS);
- // session2.prepare(xid);
+ // session2.prepare(xid);
crash(session2);
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -47,6 +47,9 @@
try
{
startServers(2, 0, 1);
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+ waitForTopology(servers[2], 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -124,6 +127,9 @@
try
{
startServers(2, 0, 1);
+ waitForTopology(servers[0], 2);
+ waitForTopology(servers[1], 2);
+ waitForTopology(servers[2], 2);
setupSessionFactory(0, isNetty());
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -37,6 +37,8 @@
public State state = NOT_STARTED;
+ public long failoverPause = 0l;
+
public InVMNodeManager()
{
liveLock = new Semaphore(1);
@@ -73,6 +75,10 @@
}
}
while (true);
+ if(failoverPause > 0l)
+ {
+ Thread.sleep(failoverPause);
+ }
}
@Override
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -74,14 +74,17 @@
jmsServer.createConnectionFactory(false, nonPersisted, "/nonPersisted"
);
+ boolean ex = false;
try
{
jmsServer.addConnectionFactoryToJNDI("np", "/someCF");
- fail("Failure expected and the API let duplicates");
}
- catch (HornetQException expected)
+ catch (Exception expected)
{
+ ex = true;
}
+
+ assertTrue(ex);
openCon("/someCF");
@@ -101,14 +104,18 @@
jmsServer.start();
jmsServer.addConnectionFactoryToJNDI("tst", "/newJNDI");
+
+ ex = false;
try
{
jmsServer.addConnectionFactoryToJNDI("tst", "/newJNDI");
- fail("Failure expected and the API let duplicates");
}
- catch (HornetQException expected)
+ catch (Exception expected)
{
+ ex = true;
}
+ assertTrue(ex);
+
openCon("/someCF");
openCon("/someCF2");
openCon("/newJNDI");
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -28,6 +28,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -428,6 +429,24 @@
Assert.assertEquals(expiryAddress, queueControl.getExpiryAddress());
}
+ public void testDuplicateJNDI() throws Exception
+ {
+ String someQueue = RandomUtil.randomString();
+ String someOtherQueue = RandomUtil.randomString();
+ serverManager.createQueue(false, someQueue, null, true, someQueue,
"/duplicate");
+ boolean exception = false;
+ try
+ {
+ serverManager.createQueue(false, someOtherQueue, null, true, someOtherQueue,
"/duplicate");
+ }
+ catch (Exception e)
+ {
+ exception = true;
+ }
+
+ assertTrue(exception);
+ }
+
public void testExpireMessage() throws Exception
{
JMSQueueControl queueControl = createManagementControl();
@@ -1177,7 +1196,6 @@
conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf.setFileDeploymentEnabled(false);
server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
- server.start();
serverManager = new JMSServerManagerImpl(server);
context = new InVMContext();
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -298,6 +298,12 @@
{
return (String)proxy.invokeOperation("listConsumersAsJSON");
}
+
+ public void removeJNDI(String jndi) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
};
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -33,13 +33,15 @@
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSConnectionInfo;
import org.hornetq.api.jms.management.JMSConsumerInfo;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.JMSSessionInfo;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -47,11 +49,17 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.jms.server.management.JMSManagementService;
+import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.inflow.HornetQActivation;
+import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
+import org.hornetq.tests.unit.ra.MessageEndpointFactory;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
@@ -469,6 +477,173 @@
}
}
}
+
+
+ public void testStartActivationListConnections() throws Exception
+ {
+ try
+ {
+ startHornetQServer(InVMAcceptorFactory.class.getName());
+ HornetQDestination queue =
(HornetQDestination)HornetQJMSClient.createQueue("test");
+ serverManager.createQueue(false, "test", null, true,
"test");
+
+ JMSServerControl control = createManagementControl();
+
+ HornetQResourceAdapter ra = new HornetQResourceAdapter();
+
+
ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
+ ra.setUserName("userGlobal");
+ ra.setPassword("passwordGlobal");
+ ra.start(new org.hornetq.tests.unit.ra.BootstrapContext());
+ ra.setClientID("my-client-id");
+ ra.setUserName("user");
+ Connection conn = ra.getDefaultHornetQConnectionFactory().createConnection();
+
+ conn.close();
+
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+
+ spec.setResourceAdapter(ra);
+
+ spec.setUseJNDI(false);
+
+ spec.setPassword("password");
+
+ spec.setDestinationType("Topic");
+ spec.setDestination("test");
+
+ spec.setMinSession(1);
+ spec.setMaxSession(1);
+
+ HornetQActivation activation = new HornetQActivation(ra, new
MessageEndpointFactory(), spec);
+
+ activation.start();
+
+ String cons = control.listConnectionsAsJSON();
+
+ JMSConnectionInfo[] jmsConnectionInfos = JMSConnectionInfo.from(cons);
+
+ assertEquals(1, jmsConnectionInfos.length);
+
+ assertEquals("user", jmsConnectionInfos[0].getUsername());
+
+ assertEquals("my-client-id", jmsConnectionInfos[0].getClientID());
+
+ activation.stop();
+
+ ra.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ /*if (connection != null)
+ {
+ connection.close();
+ }*/
+
+ if (serverManager != null)
+ {
+ //serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
+ public void testStartActivationOverrideListConnections() throws Exception
+ {
+ try
+ {
+ startHornetQServer(InVMAcceptorFactory.class.getName());
+ HornetQDestination queue =
(HornetQDestination)HornetQJMSClient.createQueue("test");
+ serverManager.createQueue(false, "test", null, true,
"test");
+
+ JMSServerControl control = createManagementControl();
+
+ HornetQResourceAdapter ra = new HornetQResourceAdapter();
+
+
ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
+ ra.setUserName("userGlobal");
+ ra.setPassword("passwordGlobal");
+ ra.start(new org.hornetq.tests.unit.ra.BootstrapContext());
+
+ Connection conn = ra.getDefaultHornetQConnectionFactory().createConnection();
+
+ conn.close();
+
+ HornetQActivationSpec spec = new HornetQActivationSpec();
+
+ spec.setResourceAdapter(ra);
+
+ spec.setUseJNDI(false);
+
+ spec.setClientId("my-client-id");
+
+ spec.setUser("user");
+ spec.setPassword("password");
+
+ spec.setDestinationType("Topic");
+ spec.setDestination("test");
+
+ spec.setMinSession(1);
+ spec.setMaxSession(1);
+
+ HornetQActivation activation = new HornetQActivation(ra, new
MessageEndpointFactory(), spec);
+
+ activation.start();
+
+ String cons = control.listConnectionsAsJSON();
+
+ JMSConnectionInfo[] jmsConnectionInfos = JMSConnectionInfo.from(cons);
+
+ assertEquals(1, jmsConnectionInfos.length);
+
+ assertEquals("user", jmsConnectionInfos[0].getUsername());
+
+ assertEquals("my-client-id", jmsConnectionInfos[0].getClientID());
+
+ activation.stop();
+
+ ra.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ /*if (connection != null)
+ {
+ connection.close();
+ }*/
+
+ if (serverManager != null)
+ {
+ //serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -101,6 +101,11 @@
return
(String)proxy.retrieveAttributeValue("staticConnectorNamePairsAsJSON");
}
+ public String getTopology()
+ {
+ return (String)proxy.retrieveAttributeValue("topology");
+ }
+
public Map<String, String> getNodes() throws Exception
{
return (Map<String,
String>)proxy.retrieveAttributeValue("nodes");
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/HornetQRATestBase.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -16,13 +16,8 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.integration.jboss.recovery.AS7RecoveryRegistry;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.tests.util.ServiceTestBase;
-import org.jboss.msc.service.*;
-import org.jboss.msc.value.Value;
-import org.jboss.tm.XAResourceRecovery;
-import org.jboss.tm.XAResourceRecoveryRegistry;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -34,14 +29,9 @@
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.*;
import javax.transaction.xa.XAResource;
-import java.io.PrintStream;
import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
@@ -70,7 +60,6 @@
server = createServer(true, configuration);
server.start();
server.createQueue(MDBQUEUEPREFIXEDSIMPLE, MDBQUEUEPREFIXEDSIMPLE, null, true,
false);
- AS7RecoveryRegistry.container = new DummyServiceContainer();
}
@Override
@@ -95,7 +84,6 @@
}
public abstract boolean isSecure();
-
class DummyMessageEndpointFactory implements MessageEndpointFactory
{
private DummyMessageEndpoint endpoint;
@@ -223,247 +211,4 @@
}
}
}
-
-
- public static class DummyServiceContainer implements ServiceContainer
- {
- public void shutdown()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isShutdownComplete()
- {
- return false; //To change body of implemented methods use File | Settings |
File Templates.
- }
-
- public void addTerminateListener(TerminateListener terminateListener)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void awaitTermination() throws InterruptedException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void awaitTermination(long l, TimeUnit timeUnit) throws
InterruptedException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void dumpServices()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void dumpServices(PrintStream printStream)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public String getName()
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceController<?> getRequiredService(ServiceName serviceName)
throws ServiceNotFoundException
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceController<?> getService(ServiceName serviceName)
- {
- ServiceController<XAResourceRecoveryRegistry> controller = new
ServiceController<XAResourceRecoveryRegistry>()
- {
- public ServiceController<?> getParent()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public ServiceContainer getServiceContainer()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public Mode getMode()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public boolean compareAndSetMode(Mode mode, Mode mode1)
- {
- return false; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public void setMode(Mode mode)
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public State getState()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public Substate getSubstate()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public XAResourceRecoveryRegistry getValue() throws IllegalStateException
- {
- XAResourceRecoveryRegistry registry = new XAResourceRecoveryRegistry()
- {
- public void addXAResourceRecovery(XAResourceRecovery
xaResourceRecovery)
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public void removeXAResourceRecovery(XAResourceRecovery
xaResourceRecovery)
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
- };
- return registry; //To change body of implemented methods use File |
Settings | File Templates.
- }
-
- public Service<XAResourceRecoveryRegistry> getService() throws
IllegalStateException
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public ServiceName getName()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public ServiceName[] getAliases()
- {
- return new ServiceName[0]; //To change body of implemented methods use
File | Settings | File Templates.
- }
-
- public void addListener(ServiceListener<? super
XAResourceRecoveryRegistry> serviceListener)
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public void addListener(ServiceListener.Inheritance inheritance,
ServiceListener<Object> objectServiceListener)
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public void removeListener(ServiceListener<? super
XAResourceRecoveryRegistry> serviceListener)
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public StartException getStartException()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public void retry()
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public Set<ServiceName> getImmediateUnavailableDependencies()
- {
- return null; //To change body of implemented methods use File | Settings
| File Templates.
- }
- };
- return controller; //To change body of implemented methods use File | Settings
| File Templates.
- }
-
- public List<ServiceName> getServiceNames()
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public <T> ServiceBuilder<T> addServiceValue(ServiceName serviceName,
Value<? extends Service<T>> value)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public <T> ServiceBuilder<T> addService(ServiceName serviceName,
Service<T> tService)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addListener(ServiceListener<Object>
objectServiceListener)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addListener(ServiceListener<Object>...
serviceListeners)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addListener(Collection<ServiceListener<Object>>
serviceListeners)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addListener(ServiceListener.Inheritance inheritance,
ServiceListener<Object> objectServiceListener)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addListener(ServiceListener.Inheritance inheritance,
ServiceListener<Object>... serviceListeners)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addListener(ServiceListener.Inheritance inheritance,
Collection<ServiceListener<Object>> serviceListeners)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget removeListener(ServiceListener<Object>
objectServiceListener)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public Set<ServiceListener<Object>> getListeners()
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addDependency(ServiceName serviceName)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addDependency(ServiceName... serviceNames)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget addDependency(Collection<ServiceName> serviceNames)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget removeDependency(ServiceName serviceName)
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public Set<ServiceName> getDependencies()
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public ServiceTarget subTarget()
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
-
- public BatchServiceTarget batchTarget()
- {
- return null; //To change body of implemented methods use File | Settings | File
Templates.
- }
- }
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/ra/ResourceAdapterTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -12,17 +12,17 @@
*/
package org.hornetq.tests.integration.ra;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.ra.HornetQResourceAdapter;
-import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.tests.util.UnitTestCase;
-import javax.resource.ResourceException;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import java.lang.reflect.Method;
-import java.util.concurrent.CountDownLatch;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jul 7, 2010
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -57,6 +57,7 @@
super.setUp();
clearData();
startServer();
+ forceGC();
}
/**
@@ -507,7 +508,7 @@
long time = System.currentTimeMillis();
time += 1000;
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 10; i++)
{
ClientMessage message = session.createMessage(true);
message.putIntProperty("value", i);
@@ -521,7 +522,7 @@
session.start();
ClientConsumer consumer = session.createConsumer(atestq);
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0 ; i < 10; i++)
{
ClientMessage message = consumer.receive(15000);
assertNotNull(message);
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -5,6 +5,8 @@
import javax.jms.MessageListener;
import javax.jms.TextMessage;
+import org.hornetq.utils.ReusableLatch;
+
/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
@@ -12,6 +14,8 @@
public class ExampleListener implements MessageListener
{
public static String lastMessage = null;
+
+ public static ReusableLatch latch = new ReusableLatch();
public void onMessage(Message message)
{
@@ -24,5 +28,6 @@
throw new RuntimeException(e);
}
System.out.println("MESSAGE RECEIVED: " + lastMessage);
+ latch.countDown();
}
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -1,5 +1,7 @@
package org.hornetq.tests.integration.spring;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
@@ -34,9 +36,12 @@
{
MessageSender sender =
(MessageSender)context.getBean("MessageSender");
System.out.println("Sending message...");
+ ExampleListener.latch.countUp();
sender.send("Hello world");
- Thread.sleep(100);
+ ExampleListener.latch.await(10, TimeUnit.SECONDS);
+ Thread.sleep(500);
Assert.assertEquals(ExampleListener.lastMessage, "Hello world");
+ System.out.println("done!");
((HornetQConnectionFactory)sender.getConnectionFactory()).close();
}
finally
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -70,6 +70,8 @@
Assert.assertEquals("pagingdir", conf.getPagingDirectory());
Assert.assertEquals("somedir", conf.getBindingsDirectory());
Assert.assertEquals(false, conf.isCreateBindingsDir());
+
+ Assert.assertEquals(17, conf.getPageMaxConcurrentIO());
Assert.assertEquals("somedir2", conf.getJournalDirectory());
Assert.assertEquals(false, conf.isCreateJournalDir());
Assert.assertEquals(JournalType.NIO, conf.getJournalType());
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -48,13 +48,19 @@
public void testStartStop1WithWhitespace() throws Exception
{
testStartStop1("fdm test file.xml");
- testStartStop1("fdm\ttest\tfile.xml");
+ if (!isWindows())
+ {
+ testStartStop1("fdm\ttest\tfile.xml");
+ }
}
public void testStartStop2WithWhitespace() throws Exception
{
testStartStop2("fdm test file.xml");
- testStartStop2("fdm\ttest\tfile.xml");
+ if (!isWindows())
+ {
+ testStartStop2("fdm\ttest\tfile.xml");
+ }
}
private void testStartStop1(final String filename) throws Exception
@@ -67,6 +73,8 @@
FileDeploymentManagerTest.log.debug(file.getAbsoluteFile());
+ System.out.println("========file name: " + file.getAbsolutePath());
+
file.createNewFile();
FakeDeployer deployer = new FakeDeployer(filename);
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -764,4 +764,28 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#newDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.SequentialFileFactory#releaseDirectBuffer(java.nio.ByteBuffer)
+ */
+ public void releaseDirectBuffer(ByteBuffer buffer)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#getDirectory()
+ */
+ public String getDirectory()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.paging.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1706,6 +1707,42 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#beforePageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#afterPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
@@ -1783,6 +1820,41 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/jms/misc/ManifestTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/jms/misc/ManifestTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/jms/misc/ManifestTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -75,7 +75,7 @@
Attributes attrs = manifest.getMainAttributes();
Assert.assertEquals(meta.getProviderVersion(),
attrs.getValue("HornetQ-Version"));
-
Assert.assertEquals("https://svn.jboss.org/repos/hornetq/branches/Br...;,
attrs.getValue("HornetQ-SVN-URL"));
+
Assert.assertEquals("https://svn.jboss.org/repos/hornetq/branches/Br...;,
attrs.getValue("HornetQ-SVN-URL"));
}
finally
{
Added: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java
(rev 0)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/BootstrapContext.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -0,0 +1,65 @@
+package org.hornetq.tests.unit.ra;
+
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.XATerminator;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
+import java.util.Timer;
+
+public class BootstrapContext implements javax.resource.spi.BootstrapContext
+{
+ public Timer createTimer() throws UnavailableException
+ {
+ return null;
+ }
+
+ public WorkManager getWorkManager()
+ {
+ return new WorkManager()
+ {
+ public void doWork(final Work work) throws WorkException
+ {
+ }
+
+ public void doWork(final Work work,
+ final long l,
+ final ExecutionContext executionContext,
+ final WorkListener workListener) throws WorkException
+ {
+ }
+
+ public long startWork(final Work work) throws WorkException
+ {
+ return 0;
+ }
+
+ public long startWork(final Work work,
+ final long l,
+ final ExecutionContext executionContext,
+ final WorkListener workListener) throws WorkException
+ {
+ return 0;
+ }
+
+ public void scheduleWork(final Work work) throws WorkException
+ {
+ work.run();
+ }
+
+ public void scheduleWork(final Work work,
+ final long l,
+ final ExecutionContext executionContext,
+ final WorkListener workListener) throws WorkException
+ {
+ }
+ };
+ }
+
+ public XATerminator getXATerminator()
+ {
+ return null;
+ }
+}
\ No newline at end of file
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -72,13 +72,13 @@
" <config-property>\n" +
" <description>Does we support HA</description>\n"
+
"
<config-property-name>HA</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value>false</config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>Use A local Transaction instead of
XA?</description>\n" +
"
<config-property-name>UseLocalTx</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value>false</config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
@@ -108,19 +108,19 @@
" <config-property>\n" +
" <description>The discovery group
port</description>\n" +
"
<config-property-name>DiscoveryPort</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The discovery refresh
timeout</description>\n" +
"
<config-property-name>DiscoveryRefreshTimeout</config-property-name>\n"
+
- "
<config-property-type>java.lang.Long</config-property-type>\n" +
+ "
<config-property-type>long</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The discovery initial wait
timeout</description>\n" +
"
<config-property-name>DiscoveryInitialWaitTimeout</config-property-name>\n"
+
- "
<config-property-type>java.lang.Long</config-property-type>\n" +
+ "
<config-property-type>long</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property> \n" +
" <config-property>\n" +
@@ -132,103 +132,103 @@
" <config-property>\n" +
" <description>number of reconnect attempts for connections
after failover occurs</description>\n" +
"
<config-property-name>ReconnectAttempts</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The client failure check
period</description>\n" +
"
<config-property-name>ClientFailureCheckPeriod</config-property-name>\n"
+
- "
<config-property-type>java.lang.Long</config-property-type>\n" +
+ "
<config-property-type>long</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The connection TTL</description>\n"
+
"
<config-property-name>ConnectionTTL</config-property-name>\n" +
- "
<config-property-type>java.lang.Long</config-property-type>\n" +
+ "
<config-property-type>long</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The call timeout</description>\n" +
"
<config-property-name>CallTimeout</config-property-name>\n" +
- "
<config-property-type>java.lang.Long</config-property-type>\n" +
+ "
<config-property-type>long</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The dups ok batch
size</description>\n" +
"
<config-property-name>DupsOKBatchSize</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The transaction batch
size</description>\n" +
"
<config-property-name>TransactionBatchSize</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The consumer window
size</description>\n" +
"
<config-property-name>ConsumerWindowSize</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The consumer max
rate</description>\n" +
"
<config-property-name>ConsumerMaxRate</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The confirmation window
size</description>\n" +
"
<config-property-name>ConfirmationWindowSize</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The producer max
rate</description>\n" +
"
<config-property-name>ProducerMaxRate</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The min large message
size</description>\n" +
"
<config-property-name>MinLargeMessageSize</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The block on
acknowledge</description>\n" +
"
<config-property-name>BlockOnAcknowledge</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The block on non durable
send</description>\n" +
"
<config-property-name>BlockOnNonDurableSend</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The block on durable
send</description>\n" +
"
<config-property-name>BlockOnDurableSend</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The auto group</description>\n" +
"
<config-property-name>AutoGroup</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The pre acknowledge</description>\n"
+
"
<config-property-name>PreAcknowledge</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>The retry interval</description>\n"
+
"
<config-property-name>RetryInterval</config-property-name>\n" +
- "
<config-property-type>java.lang.Long</config-property-type>\n" +
+ "
<config-property-type>long</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
@@ -246,19 +246,19 @@
" <config-property>\n" +
" <description>use global pools for
client</description>\n" +
"
<config-property-name>UseGlobalPools</config-property-name>\n" +
- "
<config-property-type>java.lang.Boolean</config-property-type>\n" +
+ "
<config-property-type>boolean</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>max number of threads for scheduled threrad
pool</description>\n" +
"
<config-property-name>ScheduledThreadPoolMaxSize</config-property-name>\n"
+
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
" <description>max number of threads in
pool</description>\n" +
"
<config-property-name>ThreadPoolMaxSize</config-property-name>\n" +
- "
<config-property-type>java.lang.Integer</config-property-type>\n" +
+ "
<config-property-type>int</config-property-type>\n" +
"
<config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>\n" +
@@ -325,7 +325,7 @@
System.out.println("configPropertyName = " + configPropertyName);
Method setter = methodList.remove("set" + configPropertyName);
assertNotNull("setter " + configPropertyName + " does not
exist", setter);
- Class c = setter.getParameterTypes()[0];
+ Class c = lookupType(setter);
elementsByTagName = el.getElementsByTagName("config-property-type");
assertEquals("setter " + configPropertyName + " has no type
set", elementsByTagName.getLength(), 1);
Node configPropertyTypeNode = elementsByTagName.item(0);
@@ -341,7 +341,7 @@
newConfig.append("\" <config-property>\" +
\n");
newConfig.append("\"
<description>***add***</description>\" + \n");
newConfig.append("\"
<config-property-name>").append(method.getName().substring(3)).append("</config-property-name>\"
+ \n");
- newConfig.append("\"
<config-property-type>").append(method.getParameterTypes()[0].getName()).append("</config-property-type>\"
+ \n");
+ newConfig.append("\"
<config-property-type>").append(lookupType(method).getName()).append("</config-property-type>\"
+ \n");
newConfig.append("\"
<config-property-value></config-property-value>\" + \n");
newConfig.append("\" </config-property>\" +
\n");
}
@@ -353,4 +353,30 @@
System.out.println(commentedOutConfigs);
}
}
+
+ /**
+ * @param setter
+ * @return
+ */
+ private Class<?> lookupType(Method setter)
+ {
+ Class<?> clzz = setter.getParameterTypes()[0];
+
+ if (clzz == Boolean.class)
+ {
+ return Boolean.TYPE;
+ }
+ else if (clzz == Long.class)
+ {
+ return Long.TYPE;
+ }
+ else if (clzz == Integer.class)
+ {
+ return Integer.TYPE;
+ }
+ else
+ {
+ return clzz;
+ }
+ }
}
Added:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java
(rev 0)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/MessageEndpointFactory.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -0,0 +1,27 @@
+package org.hornetq.tests.unit.ra;
+
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+
+public class MessageEndpointFactory implements
javax.resource.spi.endpoint.MessageEndpointFactory
+{
+
+ /* (non-Javadoc)
+ * @see
javax.resource.spi.endpoint.MessageEndpointFactory#createEndpoint(javax.transaction.xa.XAResource)
+ */
+ public MessageEndpoint createEndpoint(final XAResource arg0) throws
UnavailableException
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
javax.resource.spi.endpoint.MessageEndpointFactory#isDeliveryTransacted(java.lang.reflect.Method)
+ */
+ public boolean isDeliveryTransacted(final Method arg0) throws NoSuchMethodException
+ {
+ return false;
+ }
+
+}
\ No newline at end of file
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -13,24 +13,11 @@
package org.hornetq.tests.unit.ra;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import java.util.Timer;
import javax.jms.Connection;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.UnavailableException;
-import javax.resource.spi.XATerminator;
-import javax.resource.spi.endpoint.MessageEndpoint;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.ExecutionContext;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkException;
-import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
-import javax.transaction.xa.XAResource;
import junit.framework.Assert;
@@ -43,7 +30,6 @@
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.netty.NettyConnector;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.integration.jboss.recovery.AS7RecoveryRegistry;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.ra.ConnectionFactoryProperties;
@@ -51,7 +37,6 @@
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
-import org.hornetq.tests.integration.ra.HornetQRATestBase;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -467,8 +452,7 @@
ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
ra.setUserName("userGlobal");
ra.setPassword("passwordGlobal");
- AS7RecoveryRegistry.container = new HornetQRATestBase.DummyServiceContainer();
- ra.start(fakeCTX);
+ ra.start(new org.hornetq.tests.unit.ra.BootstrapContext());
Connection conn = ra.getDefaultHornetQConnectionFactory().createConnection();
@@ -489,7 +473,7 @@
spec.setMinSession(1);
spec.setMaxSession(1);
- HornetQActivation activation = new HornetQActivation(ra, new
FakeMessageEndpointFactory(), spec);
+ HornetQActivation activation = new HornetQActivation(ra, new
MessageEndpointFactory(), spec);
activation.start();
activation.stop();
@@ -529,81 +513,4 @@
}*/
}
- BootstrapContext fakeCTX = new BootstrapContext()
- {
-
- public Timer createTimer() throws UnavailableException
- {
- return null;
- }
-
- public WorkManager getWorkManager()
- {
- return new WorkManager()
- {
- public void doWork(final Work work) throws WorkException
- {
- }
-
- public void doWork(final Work work,
- final long l,
- final ExecutionContext executionContext,
- final WorkListener workListener) throws WorkException
- {
- }
-
- public long startWork(final Work work) throws WorkException
- {
- return 0;
- }
-
- public long startWork(final Work work,
- final long l,
- final ExecutionContext executionContext,
- final WorkListener workListener) throws WorkException
- {
- return 0;
- }
-
- public void scheduleWork(final Work work) throws WorkException
- {
- work.run();
- }
-
- public void scheduleWork(final Work work,
- final long l,
- final ExecutionContext executionContext,
- final WorkListener workListener) throws
WorkException
- {
- }
- };
- }
-
- public XATerminator getXATerminator()
- {
- return null;
- }
-
- };
-
- class FakeMessageEndpointFactory implements MessageEndpointFactory
- {
-
- /* (non-Javadoc)
- * @see
javax.resource.spi.endpoint.MessageEndpointFactory#createEndpoint(javax.transaction.xa.XAResource)
- */
- public MessageEndpoint createEndpoint(final XAResource arg0) throws
UnavailableException
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see
javax.resource.spi.endpoint.MessageEndpointFactory#isDeliveryTransacted(java.lang.reflect.Method)
- */
- public boolean isDeliveryTransacted(final Method arg0) throws
NoSuchMethodException
- {
- return false;
- }
-
- }
}
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-12-23
16:54:27 UTC (rev 11942)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -96,9 +96,6 @@
}
locators.clear();
super.tearDown();
-// checkFreePort(5445);
-// checkFreePort(5446);
-// checkFreePort(5447);
if (InVMRegistry.instance.size() > 0)
{
fail("InVMREgistry size > 0");
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-12-23
16:54:27 UTC (rev 11942)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/util/UnitTestCase.java 2012-01-02
13:49:15 UTC (rev 11943)
@@ -138,6 +138,13 @@
checkThread = false;
}
+ private String osType = System.getProperty("os.name").toLowerCase();
+
+ protected boolean isWindows()
+ {
+ return (osType.indexOf("win") >= 0);
+ }
+
// Static --------------------------------------------------------
protected Configuration createDefaultConfig()