JBoss hornetq SVN: r9620 - in branches/Branch_2_1/src/main/org/hornetq/core: deployers/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-31 18:06:01 -0400 (Tue, 31 Aug 2010)
New Revision: 9620
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/config/impl/FileConfiguration.java
branches/Branch_2_1/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
Log:
https://jira.jboss.org/browse/HORNETQ-478 - eliminating extra warning
Modified: branches/Branch_2_1/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-08-31 16:29:21 UTC (rev 9619)
+++ branches/Branch_2_1/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-08-31 22:06:01 UTC (rev 9620)
@@ -77,6 +77,11 @@
Element e = org.hornetq.utils.XMLUtil.stringToElement(xml);
FileConfigurationParser parser = new FileConfigurationParser();
+
+ // https://jira.jboss.org/browse/HORNETQ-478 - We only want to validate AIO when
+ // starting the server
+ // and we don't want to do it when deploying hornetq-queues.xml which uses the same parser and XML format
+ parser.setValidateAIO(true);
parser.parseMainConfig(e, this);
Modified: branches/Branch_2_1/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-08-31 16:29:21 UTC (rev 9619)
+++ branches/Branch_2_1/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-08-31 22:06:01 UTC (rev 9620)
@@ -123,13 +123,31 @@
private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route";
// Attributes ----------------------------------------------------
+
+ private boolean validateAIO = false;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
+ /**
+ * @return the validateAIO
+ */
+ public boolean isValidateAIO()
+ {
+ return validateAIO;
+ }
+ /**
+ * @param validateAIO the validateAIO to set
+ */
+ public void setValidateAIO(boolean validateAIO)
+ {
+ this.validateAIO = validateAIO;
+ }
+
public Configuration parseMainConfig(final InputStream input) throws Exception
{
@@ -431,7 +449,10 @@
}
else
{
- log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO journal");
+ if (validateAIO)
+ {
+ log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO journal");
+ }
config.setJournalType(JournalType.NIO);
}
13 years, 7 months
JBoss hornetq SVN: r9619 - in trunk: src and 14 other directories.
by do-not-reply@jboss.org
Author: bill.burke(a)jboss.com
Date: 2010-08-31 12:29:21 -0400 (Tue, 31 Aug 2010)
New Revision: 9619
Added:
trunk/src/Hornetq.iml
trunk/src/main/org/hornetq/core/registry/
trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java
trunk/src/main/org/hornetq/core/registry/MapBindingRegistry.java
trunk/src/main/org/hornetq/core/server/embedded/
trunk/src/main/org/hornetq/core/server/embedded/EmbeddedHornetQ.java
trunk/src/main/org/hornetq/integration/spring/
trunk/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java
trunk/src/main/org/hornetq/integration/spring/SpringJmsBootstrap.java
trunk/src/main/org/hornetq/jms/server/embedded/
trunk/src/main/org/hornetq/jms/server/embedded/EmbeddedJMS.java
trunk/src/main/org/hornetq/spi/BindingRegistry.java
trunk/tests/config/spring-hornetq-config.xml
trunk/tests/config/spring-hornetq-jms.xml
trunk/tests/config/spring-jms-beans.xml
trunk/tests/jms-tests/Jms-tests.iml
trunk/tests/src/org/hornetq/tests/integration/spring/
trunk/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java
trunk/tests/src/org/hornetq/tests/integration/spring/MessageSender.java
trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
Modified:
trunk/build-hornetq.xml
trunk/build.xml
trunk/pom.xml
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
BindingRegistry and Spring Integration
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-08-31 16:09:43 UTC (rev 9618)
+++ trunk/build-hornetq.xml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -65,6 +65,8 @@
<property name="jnp.client.jar.name" value="jnp-client.jar"/>
<property name="jboss.integration.jar.name" value="hornetq-jboss-as-integration.jar"/>
<property name="jboss.integration.sources.jar.name" value="hornetq-jboss-as-integration-sources.jar"/>
+ <property name="spring.integration.jar.name" value="hornetq-spring-integration.jar"/>
+ <property name="spring.integration.sources.jar.name" value="hornetq-spring-integration-sources.jar"/>
<property name="twitter.integration.jar.name" value="hornetq-twitter-integration.jar"/>
<property name="twitter.integration.sources.jar.name" value="hornetq-twitter-integration-sources.jar"/>
<property name="bootstrap.jar.name" value="hornetq-bootstrap.jar"/>
@@ -94,6 +96,7 @@
<property name="build.jms.classes.dir" value="${build.dir}/classes/jms"/>
<property name="build.jms.java5.classes.dir" value="${build.dir}/classes/jms-java5"/>
<property name="build.jboss.integration.classes.dir" value="${build.dir}/classes/jboss-integration"/>
+ <property name="build.spring.integration.classes.dir" value="${build.dir}/classes/spring-integration"/>
<property name="build.twitter.integration.classes.dir" value="${build.dir}/classes/twitter-integration"/>
<property name="build.service.classes.dir" value="${build.dir}/classes/service"/>
<property name="build.bootstrap.classes.dir" value="${build.dir}/classes/bootstrap"/>
@@ -218,6 +221,13 @@
<path refid="org.twitter4j.classpath"/>
</path>
+ <path id="spring.integration.compilation.classpath">
+ <path location="${build.core.classes.dir}"/>
+ <path location="${build.jms.classes.dir}"/>
+ <path refid="org.jboss.javaee.classpath"/>
+ <path refid="org.springframework.classpath"/>
+ </path>
+
<path id="jboss.service.compilation.classpath">
<path refid="org.jboss.javaee.classpath"/>
<path location="${build.core.classes.dir}"/>
@@ -247,11 +257,13 @@
<path refid="bootstrap.compilation.classpath"/>
<path refid="junit.junit.classpath"/>
<path refid="org.twitter4j.classpath"/>
+ <path refid="org.springframework.classpath"/>
<path location="${build.jars.dir}/${ra.jar.name}"/>
<path location="${build.jars.dir}/${jms.jar.name}"/>
<path location="${build.jars.dir}/${jboss.integration.jar.name}"/>
<path location="${build.jars.dir}/${bootstrap.jar.name}"/>
<path location="${build.jars.dir}/${logging.jar.name}"/>
+ <path location="${build.jars.dir}/${spring.integration.jar.name}"/>
<path location="${build.jars.dir}/${twitter.integration.jar.name}"/>
</path>
@@ -305,6 +317,7 @@
<!-- we must include Apache commons logging -->
<!-- as a transitive dependency from JBoss TM -->
<path refid="apache.logging.classpath"/>
+ <path refid="org.springframework.classpath"/>
<path refid="org.twitter4j.classpath"/>
</path>
@@ -382,6 +395,7 @@
<mkdir dir="${build.jms.classes.dir}"/>
<mkdir dir="${build.jms.java5.classes.dir}"/>
<mkdir dir="${build.jboss.integration.classes.dir}"/>
+ <mkdir dir="${build.spring.integration.classes.dir}"/>
<mkdir dir="${build.twitter.integration.classes.dir}"/>
<mkdir dir="${build.service.classes.dir}"/>
<mkdir dir="${build.bootstrap.classes.dir}"/>
@@ -580,6 +594,26 @@
</javac>
</target>
+ <target name="compile-spring-integration" depends="compile-core">
+ <javac destdir="${build.spring.integration.classes.dir}"
+ target="${javac.target}"
+ source="${javac.source}"
+ optimize="${javac.optimize}"
+ debug="${javac.debug}"
+ depend="${javac.depend}"
+ verbose="${javac.verbose}"
+ deprecation="${javac.deprecation}"
+ includeAntRuntime="${javac.include.ant.runtime}"
+ includeJavaRuntime="${javac.include.java.runtime}"
+ failonerror="${javac.fail.onerror}">
+ <src>
+ <pathelement path="${src.main.dir}"/>
+ </src>
+ <include name="org/hornetq/integration/spring/**/*.java"/>
+ <classpath refid="spring.integration.compilation.classpath"/>
+ </javac>
+ </target>
+
<!-- author: Lucas Amador -->
<target name="compile-jboss-service" depends="compile-core">
<javac destdir="${build.service.classes.dir}"
@@ -720,11 +754,11 @@
<!-- ======================================================================================== -->
<target name="sources-jar" description="create jar files containing source code"
- depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources">
+ depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources, jar-spring-integration-sources">
</target>
<target name="jar"
- depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration">
+ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration, jar-spring-integration">
</target>
<target name="jar-jnp-client" depends="init">
@@ -884,6 +918,22 @@
</jar>
</target>
+ <target name="jar-spring-integration" depends="compile-spring-integration">
+
+ <jar jarfile="${build.jars.dir}/${spring.integration.jar.name}">
+ <fileset dir="${build.spring.integration.classes.dir}" includes="**"/>
+ </jar>
+
+ </target>
+
+ <target name="jar-spring-integration-sources">
+ <jar jarfile="${build.jars.dir}/${spring.integration.sources.jar.name}">
+ <fileset dir="${src.main.dir}">
+ <include name="org/hornetq/integration/spring/**/*.java"/>
+ </fileset>
+ </jar>
+ </target>
+
<!-- author: Lucas Amador -->
<target name="jar-jboss-service" depends="compile-jboss-service">
@@ -1148,6 +1198,7 @@
<include name="${jms.client.jar.name}"/>
<include name="${jms.client.java5.jar.name}"/>
<include name="${jnp.client.jar.name}"/>
+ <include name="${spring.integration.jar.name}"/>
<include name="${twitter.integration.jar.name}"/>
</fileset>
<fileset dir="${org.jboss.naming.lib}">
@@ -1490,6 +1541,12 @@
</antcall>
</target>
+ <target name="spring-tests" depends="jar, compile-unit-tests">
+ <antcall inheritall="true" inheritrefs="true" target="tests">
+ <param name="tests.param" value="**/org/hornetq/tests/integration/spring/*${test-mask}.class"/>
+ </antcall>
+ </target>
+
<target name="failover-tests" depends="jar, compile-unit-tests">
<antcall inheritall="true" inheritrefs="true" target="tests">
<param name="tests.param" value="**/org/hornetq/tests/integration/cluster/failover/**/*${test-mask}.class"/>
Modified: trunk/build.xml
===================================================================
--- trunk/build.xml 2010-08-31 16:09:43 UTC (rev 9618)
+++ trunk/build.xml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -243,7 +243,12 @@
<ant antfile="build-hornetq.xml" target="compile-reports"/>
</target>
+ <target name="spring-tests" depends="createthirdparty">
+ <ant antfile="build-hornetq.xml" target="spring-tests"/>
+ <ant antfile="build-hornetq.xml" target="compile-reports"/>
+ </target>
+
<target name="failover-tests" depends="createthirdparty">
<ant antfile="build-hornetq.xml" target="failover-tests"/>
<ant antfile="build-hornetq.xml" target="compile-reports"/>
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-08-31 16:09:43 UTC (rev 9618)
+++ trunk/pom.xml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -284,6 +284,27 @@
<artifactId>apiviz</artifactId>
<version>1.3.0.GA</version>
</dependency>
+ <!-- needed for spring integration -->
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ <version>3.0.3.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ <version>3.0.3.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>3.0.3.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <version>3.0.3.RELEASE</version>
+ </dependency>
</dependencies>
Added: trunk/src/Hornetq.iml
===================================================================
--- trunk/src/Hornetq.iml (rev 0)
+++ trunk/src/Hornetq.iml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" inherit-compiler-output="true">
+ <exclude-output />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/main" isTestSource="false" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../thirdparty/org/twitter4j/lib/twitter4j-core.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="library" name="lib7" level="project" />
+ <orderEntry type="library" name="lib4" level="project" />
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../thirdparty/org/jboss/netty/lib/netty.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="library" name="lib2" level="project" />
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../thirdparty/org/jboss/integration/lib/jboss-transaction-spi.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="library" name="lib3" level="project" />
+ <orderEntry type="library" name="lib6" level="project" />
+ <orderEntry type="library" name="jars" level="project" />
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../tools/maven/lib/maven-2.0.9-uber.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="library" name="lib1" level="project" />
+ <orderEntry type="library" name="lib9" level="project" />
+ <orderEntry type="library" name="lib-spring" level="project" />
+ </component>
+</module>
+
Added: trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java
===================================================================
--- trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java (rev 0)
+++ trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,108 @@
+package org.hornetq.core.registry;
+
+import org.hornetq.spi.BindingRegistry;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class JndiBindingRegistry implements BindingRegistry
+{
+ private Context context;
+
+ public JndiBindingRegistry(Context context)
+ {
+ this.context = context;
+ }
+
+ public JndiBindingRegistry() throws Exception
+ {
+ this.context = new InitialContext();
+ }
+
+ public Object lookup(String name)
+ {
+ try
+ {
+ return context.lookup(name);
+ }
+ catch (NamingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean bind(String name, Object obj)
+ {
+ try
+ {
+ return bindToJndi(name, obj);
+ }
+ catch (NamingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void unbind(String name)
+ {
+ try
+ {
+ context.unbind(name);
+ }
+ catch (NamingException e)
+ {
+ }
+ }
+
+ public void close()
+ {
+ try
+ {
+ context.close();
+ }
+ catch (NamingException e)
+ {
+ }
+ }
+
+
+ private boolean bindToJndi(final String jndiName, final Object objectToBind) throws NamingException
+ {
+ if (context != null)
+ {
+ String parentContext;
+ String jndiNameInContext;
+ int sepIndex = jndiName.lastIndexOf('/');
+ if (sepIndex == -1)
+ {
+ parentContext = "";
+ }
+ else
+ {
+ parentContext = jndiName.substring(0, sepIndex);
+ }
+ jndiNameInContext = jndiName.substring(sepIndex + 1);
+ try
+ {
+ context.lookup(jndiName);
+
+ //JMSServerManagerImpl.log.warn("Binding for " + jndiName + " already exists");
+ return false;
+ }
+ catch (Throwable e)
+ {
+ // OK
+ }
+
+ Context c = org.hornetq.utils.JNDIUtil.createContext(context, parentContext);
+
+ c.rebind(jndiNameInContext, objectToBind);
+ }
+ return true;
+ }
+}
Added: trunk/src/main/org/hornetq/core/registry/MapBindingRegistry.java
===================================================================
--- trunk/src/main/org/hornetq/core/registry/MapBindingRegistry.java (rev 0)
+++ trunk/src/main/org/hornetq/core/registry/MapBindingRegistry.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,32 @@
+package org.hornetq.core.registry;
+
+import org.hornetq.spi.BindingRegistry;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class MapBindingRegistry implements BindingRegistry
+{
+ protected ConcurrentHashMap<String, Object> registry = new ConcurrentHashMap<String, Object>();
+
+ public Object lookup(String name)
+ {
+ return registry.get(name);
+ }
+
+ public boolean bind(String name, Object obj)
+ {
+ return registry.putIfAbsent(name, obj) == null;
+ }
+
+ public void unbind(String name)
+ {
+ registry.remove(name);
+ }
+
+ public void close()
+ {
+ }
+}
Added: trunk/src/main/org/hornetq/core/server/embedded/EmbeddedHornetQ.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/embedded/EmbeddedHornetQ.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/embedded/EmbeddedHornetQ.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,87 @@
+package org.hornetq.core.server.embedded;
+
+import org.hornetq.core.config.impl.FileConfiguration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+
+import javax.management.MBeanServer;
+
+/**
+ * Helper class to simplify bootstrap of HornetQ server. Bootstraps from classpath-based config files.
+ *
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class EmbeddedHornetQ
+{
+ protected HornetQSecurityManager securityManager;
+ protected FileConfiguration configuration = new FileConfiguration();
+ protected HornetQServer hornetQServer;
+ protected MBeanServer mbeanServer;
+
+ /**
+ * Classpath resource for hornetq server config. Defaults to 'hornetq-configuration.xml'.
+ *
+ * @param filename
+ */
+ public void setConfigResourcePath(String filename)
+ {
+ configuration.setConfigurationUrl(filename);
+ }
+
+ /**
+ * Set the hornetq security manager. This defaults to org.hornetq.spi.core.security.HornetQSecurityManagerImpl
+ *
+ * @param securityManager
+ */
+ public void setSecurityManager(HornetQSecurityManager securityManager)
+ {
+ this.securityManager = securityManager;
+ }
+
+ /**
+ * Use this mbean server to register management beans. If not set, no mbeans will be registered.
+ *
+ * @param mbeanServer
+ */
+ public void setMbeanServer(MBeanServer mbeanServer)
+ {
+ this.mbeanServer = mbeanServer;
+ }
+
+ public HornetQServer getHornetQServer()
+ {
+ return hornetQServer;
+ }
+
+ public void start() throws Exception
+ {
+ initStart();
+ hornetQServer.start();
+
+ }
+
+ protected void initStart() throws Exception
+ {
+ configuration.start();
+ if (securityManager == null)
+ {
+ securityManager = new HornetQSecurityManagerImpl();
+ }
+ if (mbeanServer == null)
+ {
+ hornetQServer = new HornetQServerImpl(configuration, securityManager);
+ }
+ else
+ {
+ hornetQServer = new HornetQServerImpl(configuration, mbeanServer, securityManager);
+ }
+ }
+
+ public void stop() throws Exception
+ {
+ hornetQServer.stop();
+ }
+}
Added: trunk/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java
===================================================================
--- trunk/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java (rev 0)
+++ trunk/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,37 @@
+package org.hornetq.integration.spring;
+
+import org.hornetq.spi.BindingRegistry;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class SpringBindingRegistry implements BindingRegistry
+{
+ private ConfigurableBeanFactory factory;
+
+ public SpringBindingRegistry(ConfigurableBeanFactory factory)
+ {
+ this.factory = factory;
+ }
+
+ public Object lookup(String name)
+ {
+ return factory.getBean(name);
+ }
+
+ public boolean bind(String name, Object obj)
+ {
+ factory.registerSingleton(name, obj);
+ return true;
+ }
+
+ public void unbind(String name)
+ {
+ }
+
+ public void close()
+ {
+ }
+}
Added: trunk/src/main/org/hornetq/integration/spring/SpringJmsBootstrap.java
===================================================================
--- trunk/src/main/org/hornetq/integration/spring/SpringJmsBootstrap.java (rev 0)
+++ trunk/src/main/org/hornetq/integration/spring/SpringJmsBootstrap.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,20 @@
+package org.hornetq.integration.spring;
+
+import org.hornetq.jms.server.embedded.EmbeddedJMS;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class SpringJmsBootstrap extends EmbeddedJMS implements BeanFactoryAware
+{
+ public void setBeanFactory(BeanFactory beanFactory) throws BeansException
+ {
+ System.out.println("SpringJmsBootstrap setBeanFactory...");
+ registry = new SpringBindingRegistry((ConfigurableBeanFactory)beanFactory);
+ }
+}
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-08-31 16:09:43 UTC (rev 9618)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.spi.BindingRegistry;
/**
* The JMS Management interface.
@@ -56,46 +57,46 @@
* @throws Exception
* if problems were encountered creating the queue.
*/
- boolean createQueue(boolean storeConfig, String queueName, String selectorString, boolean durable, String ...jndi) throws Exception;
+ boolean createQueue(boolean storeConfig, String queueName, String selectorString, boolean durable, String ...bindings) throws Exception;
- boolean addTopicToJndi(final String topicName, final String jndiBinding) throws Exception;
+ boolean addTopicToJndi(final String topicName, final String binding) throws Exception;
- boolean addQueueToJndi(final String queueName, final String jndiBinding) throws Exception;
+ boolean addQueueToJndi(final String queueName, final String binding) throws Exception;
- boolean addConnectionFactoryToJNDI(final String name, final String jndiBinding) throws Exception;
+ boolean addConnectionFactoryToJNDI(final String name, final String binding) throws Exception;
/**
* Creates a JMS Topic
*
* @param topicName
* the name of the topic
- * @param jndiBinding
- * the name of the binding for JNDI
+ * @param binding
+ * the name of the binding for JNDI or BindingRegistry
* @return true if the topic was created or if it existed and was added to
* JNDI
* @throws Exception
* if a problem occurred creating the topic
*/
- boolean createTopic(boolean storeConfig, String topicName, String ... jndi) throws Exception;
+ boolean createTopic(boolean storeConfig, String topicName, String ... bindings) throws Exception;
/**
- * Remove the topic from JNDI.
+ * Remove the topic from JNDI or BindingRegistry.
* Calling this method does <em>not</em> destroy the destination.
*
* @param name
- * the name of the destination to remove from JNDI
+ * the name of the destination to remove from JNDI or BindingRegistry
* @return true if removed
* @throws Exception
* if a problem occurred removing the destination
*/
- boolean removeTopicFromJNDI(String name, String jndi) throws Exception;
+ boolean removeTopicFromJNDI(String name, String binding) throws Exception;
/**
- * Remove the topic from JNDI.
+ * Remove the topic from JNDI or BindingRegistry.
* Calling this method does <em>not</em> destroy the destination.
*
* @param name
- * the name of the destination to remove from JNDI
+ * the name of the destination to remove from JNDI or BindingRegistry
* @return true if removed
* @throws Exception
* if a problem occurred removing the destination
@@ -103,35 +104,35 @@
boolean removeTopicFromJNDI(String name) throws Exception;
/**
- * Remove the queue from JNDI.
+ * Remove the queue from JNDI or BindingRegistry.
* Calling this method does <em>not</em> destroy the destination.
*
* @param name
- * the name of the destination to remove from JNDI
+ * the name of the destination to remove from JNDI or BindingRegistry
* @return true if removed
* @throws Exception
* if a problem occurred removing the destination
*/
- boolean removeQueueFromJNDI(String name, String jndi) throws Exception;
+ boolean removeQueueFromJNDI(String name, String binding) throws Exception;
/**
- * Remove the queue from JNDI.
+ * Remove the queue from JNDI or BindingRegistry.
* Calling this method does <em>not</em> destroy the destination.
*
* @param name
- * the name of the destination to remove from JNDI
+ * the name of the destination to remove from JNDI or BindingRegistry
* @return true if removed
* @throws Exception
* if a problem occurred removing the destination
*/
boolean removeQueueFromJNDI(String name) throws Exception;
- boolean removeConnectionFactoryFromJNDI(String name, String jndi) throws Exception;
+ boolean removeConnectionFactoryFromJNDI(String name, String binding) throws Exception;
boolean removeConnectionFactoryFromJNDI(String name) throws Exception;
/**
- * destroys a queue and removes it from JNDI
+ * destroys a queue and removes it from JNDI or BindingRegistry
*
* @param name
* the name of the queue to destroy
@@ -148,7 +149,7 @@
String[] getJNDIOnConnectionFactory(String factoryName);
/**
- * destroys a topic and removes it from JNDI
+ * destroys a topic and removes it from JNDI or BindingRegistry
*
* @param name
* the name of the topic to destroy
@@ -158,37 +159,37 @@
*/
boolean destroyTopic(String name) throws Exception;
- void createConnectionFactory(String name, String discoveryAddress, int discoveryPort, String ... jndiBindings) throws Exception;
+ void createConnectionFactory(String name, String discoveryAddress, int discoveryPort, String ... bindings) throws Exception;
void createConnectionFactory(String name,
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- String ... jndiBindings) throws Exception;
+ String ... bindings) throws Exception;
void createConnectionFactory(String name,
TransportConfiguration liveTC,
TransportConfiguration backupTC,
- String ... jndiBindings) throws Exception;
+ String ... bindings) throws Exception;
- void createConnectionFactory(String name, TransportConfiguration liveTC, String ... jndiBindings) throws Exception;
+ void createConnectionFactory(String name, TransportConfiguration liveTC, String ... bindings) throws Exception;
void createConnectionFactory(String name,
String clientID,
String discoveryAddress,
int discoveryPort,
- String ... jndiBindings) throws Exception;
+ String ... bindings) throws Exception;
void createConnectionFactory(String name,
String clientID,
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- String ... jndiBindings) throws Exception;
+ String ... bindings) throws Exception;
void createConnectionFactory(String name,
String clientID,
TransportConfiguration liveTC,
TransportConfiguration backupTC,
- String ... jndiBindings) throws Exception;
+ String ... bindings) throws Exception;
- void createConnectionFactory(String name, String clientID, TransportConfiguration liveTC, String ... jndiBindings) throws Exception;
+ void createConnectionFactory(String name, String clientID, TransportConfiguration liveTC, String ... bindings) throws Exception;
void createConnectionFactory(String name,
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
@@ -221,7 +222,7 @@
boolean failoverOnInitialConnection,
boolean failoverOnServerShutdown,
String groupId,
- String ... jndiBindings) throws Exception;
+ String ... bindings) throws Exception;
void createConnectionFactory(String name,
String localBindAdress,
@@ -258,9 +259,9 @@
boolean failoverOnInitialConnection,
boolean failoverOnServerShutdown,
String groupId,
- String ... jndiBindings) throws Exception;
+ String ... bindings) throws Exception;
- void createConnectionFactory(boolean storeConfig, ConnectionFactoryConfiguration cfConfig, String... jndiBindings) throws Exception;
+ void createConnectionFactory(boolean storeConfig, ConnectionFactoryConfiguration cfConfig, String... bindings) throws Exception;
/**
* destroys a connection factory.
@@ -294,4 +295,13 @@
void addSecurity(String addressMatch, Set<Role> roles);
Set<Role> getSecurity(final String addressMatch);
+
+ BindingRegistry getRegistry();
+
+ /**
+ * Set this property if you want something other than JNDI for your registry
+ *
+ * @param registry
+ */
+ void setRegistry(BindingRegistry registry);
}
Added: trunk/src/main/org/hornetq/jms/server/embedded/EmbeddedJMS.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/embedded/EmbeddedJMS.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/server/embedded/EmbeddedJMS.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,77 @@
+package org.hornetq.jms.server.embedded;
+
+import org.hornetq.core.registry.MapBindingRegistry;
+import org.hornetq.core.server.embedded.EmbeddedHornetQ;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.BindingRegistry;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Simple bootstrap class that parses hornetq config files (server and jms and security) and starts
+ * a HornetQServer instance and populates it with configured JMS endpoints.
+ *
+ * JMS Endpoints are registered with a simple MapBindingRegistry. If you want to use a different registry
+ * you must set the registry property of this clas
+ *
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class EmbeddedJMS extends EmbeddedHornetQ
+{
+ protected JMSServerManagerImpl serverManager;
+ protected BindingRegistry registry;
+ protected String jmsConfigResourcePath;
+
+ /**
+ * Classpath resource where JMS config file is. Defaults to 'hornetq-jms.xml'
+ *
+ * @param jmsConfigResourcePath
+ */
+ public void setJmsConfigResourcePath(String jmsConfigResourcePath)
+ {
+ this.jmsConfigResourcePath = jmsConfigResourcePath;
+ }
+
+ public BindingRegistry getRegistry()
+ {
+ return registry;
+ }
+
+ public void setRegistry(BindingRegistry registry)
+ {
+ this.registry = registry;
+ }
+
+ /**
+ * Lookup in the registry for registered object, i.e. a ConnectionFactory. This is a convenience method.
+ *
+ * @param name
+ * @return
+ */
+ public Object lookup(String name)
+ {
+ return serverManager.getRegistry().lookup(name);
+ }
+
+ public void start() throws Exception
+ {
+ System.out.println("EmbeddedJMS starting...");
+ super.initStart();
+ if (jmsConfigResourcePath == null) serverManager = new JMSServerManagerImpl(hornetQServer);
+ else serverManager = new JMSServerManagerImpl(hornetQServer, jmsConfigResourcePath);
+ if (registry == null)
+ {
+ registry = new MapBindingRegistry();
+ }
+ serverManager.setRegistry(registry);
+ serverManager.start();
+ }
+
+ public void stop() throws Exception
+ {
+ serverManager.stop();
+ }
+
+}
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-08-31 16:09:43 UTC (rev 9618)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -41,6 +41,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.registry.JndiBindingRegistry;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
@@ -65,6 +66,7 @@
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.management.JMSManagementService;
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
+import org.hornetq.spi.BindingRegistry;
import org.hornetq.utils.TimeAndCounterIDGenerator;
/**
@@ -89,6 +91,8 @@
private static final String REJECT_FILTER = "__HQX=-1";
+ private BindingRegistry registry;
+
/**
* the context to bind to
*/
@@ -223,9 +227,10 @@
return;
}
- if (!contextSet)
+ if (registry == null)
{
- context = new InitialContext();
+ if (!contextSet) context = new InitialContext();
+ registry = new JndiBindingRegistry(context);
}
deploymentManager = new FileDeploymentManager(server.getConfiguration().getFileDeployerScanPeriod());
@@ -286,9 +291,9 @@
topicJNDI.clear();
topics.clear();
- if (context != null)
+ if (registry != null)
{
- context.close();
+ registry.close();
}
// it could be null if a backup
@@ -311,6 +316,17 @@
// JMSServerManager implementation -------------------------------
+
+ public BindingRegistry getRegistry()
+ {
+ return registry;
+ }
+
+ public void setRegistry(BindingRegistry registry)
+ {
+ this.registry = registry;
+ }
+
public HornetQServer getHornetQServer()
{
return server;
@@ -1300,18 +1316,11 @@
{
return false;
}
- if (context != null)
+ if (registry != null)
{
for (String jndiBinding : jndiBindings)
{
- try
- {
- context.unbind(jndiBinding);
- }
- catch (NameNotFoundException e)
- {
- // this is ok.
- }
+ registry.unbind(jndiBinding);
}
}
connectionFactoryJNDI.remove(name);
@@ -1379,35 +1388,9 @@
private boolean bindToJndi(final String jndiName, final Object objectToBind) throws NamingException
{
- if (context != null)
+ if (registry != null)
{
- String parentContext;
- String jndiNameInContext;
- int sepIndex = jndiName.lastIndexOf('/');
- if (sepIndex == -1)
- {
- parentContext = "";
- }
- else
- {
- parentContext = jndiName.substring(0, sepIndex);
- }
- jndiNameInContext = jndiName.substring(sepIndex + 1);
- try
- {
- context.lookup(jndiName);
-
- JMSServerManagerImpl.log.warn("Binding for " + jndiName + " already exists");
- return false;
- }
- catch (Throwable e)
- {
- // OK
- }
-
- Context c = org.hornetq.utils.JNDIUtil.createContext(context, parentContext);
-
- c.rebind(jndiNameInContext, objectToBind);
+ registry.bind(jndiName, objectToBind);
}
return true;
}
@@ -1482,20 +1465,13 @@
*/
private void unbindJNDI(Map<String, List<String>> param)
{
- if (context != null)
+ if (registry != null)
{
for (List<String> elementList : param.values())
{
for (String key : elementList)
{
- try
- {
- context.unbind(key);
- }
- catch (Exception e)
- {
- log.warn("Impossible to unbind key " + key + " from JNDI", e);
- }
+ registry.unbind(key);
}
}
}
@@ -1621,13 +1597,13 @@
{
keys.remove(name);
}
- if (context != null)
+ if (registry != null)
{
Iterator<String> iter = jndiBindings.iterator();
while (iter.hasNext())
{
String jndiBinding = iter.next();
- context.unbind(jndiBinding);
+ registry.unbind(jndiBinding);
iter.remove();
}
}
@@ -1647,7 +1623,7 @@
if (jndiBindings.remove(jndi))
{
- context.unbind(jndi);
+ registry.unbind(jndi);
return true;
}
else
Added: trunk/src/main/org/hornetq/spi/BindingRegistry.java
===================================================================
--- trunk/src/main/org/hornetq/spi/BindingRegistry.java (rev 0)
+++ trunk/src/main/org/hornetq/spi/BindingRegistry.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,15 @@
+package org.hornetq.spi;
+
+/**
+ * Abstract interface for a registry to store endpoints like connection factories into.
+ *
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public interface BindingRegistry
+{
+ Object lookup(String name);
+ boolean bind(String name, Object obj);
+ void unbind(String name);
+ void close();
+}
Added: trunk/tests/config/spring-hornetq-config.xml
===================================================================
--- trunk/tests/config/spring-hornetq-config.xml (rev 0)
+++ trunk/tests/config/spring-hornetq-config.xml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,34 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+ <persistence-enabled>false</persistence-enabled>
+ <security-enabled>false</security-enabled>
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="in-vm">
+ <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <acceptors>
+ <acceptor name="in-vm">
+ <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+ <!--
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings> -->
+
+</configuration>
Added: trunk/tests/config/spring-hornetq-jms.xml
===================================================================
--- trunk/tests/config/spring-hornetq-jms.xml (rev 0)
+++ trunk/tests/config/spring-hornetq-jms.xml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,19 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="in-vm"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
Added: trunk/tests/config/spring-jms-beans.xml
===================================================================
--- trunk/tests/config/spring-jms-beans.xml (rev 0)
+++ trunk/tests/config/spring-jms-beans.xml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
+
+ <bean id="EmbeddedJms" class="org.hornetq.integration.spring.SpringJmsBootstrap" init-method="start">
+ <property name="configResourcePath" value="spring-hornetq-config.xml"/>
+ <property name="jmsConfigResourcePath" value="spring-hornetq-jms.xml"/>
+ </bean>
+
+ <bean id="listener" class="org.hornetq.tests.integration.spring.ExampleListener"/>
+
+ <bean id="MessageSender" class="org.hornetq.tests.integration.spring.MessageSender">
+ <property name="connectionFactory" ref="ConnectionFactory"/>
+ <property name="destination" ref="/queue/exampleQueue"/>
+ </bean>
+
+ <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
+ <property name="connectionFactory" ref="ConnectionFactory"/>
+ <property name="destination" ref="/queue/exampleQueue"/>
+ <property name="messageListener" ref="listener"/>
+ </bean>
+
+
+</beans>
+
\ No newline at end of file
Added: trunk/tests/jms-tests/Jms-tests.iml
===================================================================
--- trunk/tests/jms-tests/Jms-tests.iml (rev 0)
+++ trunk/tests/jms-tests/Jms-tests.iml 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" inherit-compiler-output="true">
+ <exclude-output />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../../thirdparty/org/jboss/naming/lib/jnpserver.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="library" name="lib7" level="project" />
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="library" name="jars" level="project" />
+ <orderEntry type="library" name="lib4" level="project" />
+ <orderEntry type="library" name="lib" level="project" />
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../../thirdparty/junit/lib/junit.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="module-library">
+ <library>
+ <CLASSES>
+ <root url="jar://$MODULE_DIR$/../../thirdparty/org/jboss/integration/lib/jboss-transaction-spi.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ </orderEntry>
+ <orderEntry type="library" name="lib2" level="project" />
+ <orderEntry type="library" name="lib6" level="project" />
+ <orderEntry type="module" module-name="Hornetq" />
+ </component>
+</module>
+
Added: trunk/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,28 @@
+package org.hornetq.tests.integration.spring;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class ExampleListener implements MessageListener
+{
+ public static String lastMessage = null;
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ lastMessage = ((TextMessage)message).getText();
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ System.out.println("MESSAGE RECEIVED: " + lastMessage);
+ }
+}
Added: trunk/tests/src/org/hornetq/tests/integration/spring/MessageSender.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/spring/MessageSender.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/spring/MessageSender.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,56 @@
+package org.hornetq.tests.integration.spring;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class MessageSender
+{
+ private ConnectionFactory connectionFactory;
+ private Destination destination;
+
+ public ConnectionFactory getConnectionFactory()
+ {
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory)
+ {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public Destination getDestination()
+ {
+ return destination;
+ }
+
+ public void setDestination(Destination destination)
+ {
+ this.destination = destination;
+ }
+
+ public void send(String msg)
+ {
+ try
+ {
+ Connection conn = connectionFactory.createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage(msg);
+ producer.send(message);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+}
Added: trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2010-08-31 16:29:21 UTC (rev 9619)
@@ -0,0 +1,24 @@
+package org.hornetq.tests.integration.spring;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class SpringIntegrationTest extends TestCase
+{
+ public void testSpring() throws Exception
+ {
+ System.out.println("Creating bean factory...");
+ ApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"spring-jms-beans.xml"});
+ MessageSender sender = (MessageSender)context.getBean("MessageSender");
+ System.out.println("Sending message...");
+ sender.send("Hello world");
+ Thread.sleep(100);
+ Assert.assertEquals(ExampleListener.lastMessage, "Hello world");
+ }
+}
13 years, 7 months
JBoss hornetq SVN: r9618 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/client/impl and 3 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-31 12:09:43 -0400 (Tue, 31 Aug 2010)
New Revision: 9618
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
fixed failover tests
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-08-31 16:01:47 UTC (rev 9617)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-08-31 16:09:43 UTC (rev 9618)
@@ -78,6 +78,8 @@
public static final long DEFAULT_MAX_RETRY_INTERVAL = 2000;
public static final int DEFAULT_RECONNECT_ATTEMPTS = 0;
+
+ public static final int INITIAL_CONNECT_ATTEMPTS = 1;
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 16:01:47 UTC (rev 9617)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 16:09:43 UTC (rev 9618)
@@ -160,6 +160,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
+ final int initialConnectAttempts,
final boolean failoverOnInitialConnection,
final ExecutorService threadPool,
final ScheduledExecutorService scheduledThreadPool,
@@ -203,7 +204,7 @@
// Get the connection
- getConnectionWithRetry(reconnectAttempts);
+ getConnectionWithRetry(initialConnectAttempts);
if (connection == null && failoverOnInitialConnection)
{
@@ -221,7 +222,7 @@
transportParams = connectorConfig.getParams();
- getConnectionWithRetry(reconnectAttempts);
+ getConnectionWithRetry(initialConnectAttempts);
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-31 16:01:47 UTC (rev 9617)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-31 16:09:43 UTC (rev 9618)
@@ -145,6 +145,8 @@
private int reconnectAttempts;
+ private int initialConnectAttempts;
+
private boolean failoverOnInitialConnection;
private int initialMessagePacketSize;
@@ -370,6 +372,8 @@
reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+ initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
+
failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
failoverOnServerShutdown = HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
@@ -498,6 +502,7 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
+ initialConnectAttempts,
failoverOnInitialConnection,
threadPool,
scheduledThreadPool,
@@ -563,6 +568,7 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
+ initialConnectAttempts,
failoverOnInitialConnection,
threadPool,
scheduledThreadPool,
@@ -900,6 +906,12 @@
this.reconnectAttempts = reconnectAttempts;
}
+ public void setInitialConnectAttempts(int initialConnectAttempts)
+ {
+ checkWrite();
+ this.initialConnectAttempts = initialConnectAttempts;
+ }
+
public synchronized boolean isFailoverOnInitialConnection()
{
return this.failoverOnInitialConnection;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-31 16:01:47 UTC (rev 9617)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-31 16:09:43 UTC (rev 9618)
@@ -163,6 +163,8 @@
private final String journalDir;
private final String largeMessagesDirectory;
+
+ private boolean journalLoaded = false;
// Persisted core configuration
@@ -978,7 +980,7 @@
{
messageJournal.runDirectJournalBlast();
}
-
+ journalLoaded = true;
return info;
}
@@ -1114,13 +1116,18 @@
return;
}
- // Must call close to make sure last id is persisted
- idGenerator.close();
+ if (journalLoaded)
+ {
+ // Must call close to make sure last id is persisted
+ idGenerator.close();
+ }
bindingsJournal.stop();
messageJournal.stop();
+ journalLoaded = false;
+
started = false;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-31 16:01:47 UTC (rev 9617)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-08-31 16:09:43 UTC (rev 9618)
@@ -599,7 +599,8 @@
configuration.setBackup(false);
clusterManager.activate();
-
+ //todo fix this problem with the journal
+ Thread.sleep(200);
initialisePart2();
log.info("Back Up Server is now live");
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-08-31 16:01:47 UTC (rev 9617)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-08-31 16:09:43 UTC (rev 9618)
@@ -13,10 +13,7 @@
package org.hornetq.tests.integration.cluster.failover;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -96,18 +93,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -123,7 +108,7 @@
producer.send(message);
}
- fail(session, latch);
+ fail(session);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -171,18 +156,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 10;
@@ -215,7 +188,7 @@
if (i == 5)
{
- fail(session, latch);
+ fail(session);
}
}
@@ -260,7 +233,7 @@
/** It doesn't fail, but it restart both servers, live and backup, and the data should be received after the restart,
* and the servers should be able to connect without any problems. */
- public void testRestartServers() throws Exception
+ public void _testRestartServers() throws Exception
{
ServerLocator locator = getServerLocator();
@@ -392,21 +365,26 @@
Assert.assertEquals(0, sf.numConnections());
}
- /**
- * @param session
- * @param latch
- * @throws InterruptedException
- */
- private void fail(final ClientSession session, final CountDownLatch latch) throws Exception
+
+ private void fail(final ClientSession... sessions) throws Exception
{
+ final CountDownLatch latch = new CountDownLatch(sessions.length);
- //RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+ class MyListener extends BaseListener
+ {
+ public void connectionFailed(final HornetQException me)
+ {
+ latch.countDown();
+ }
- // Simulate failure on connection
- //conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+ }
+ for (ClientSession session : sessions)
+ {
+ session.addFailureListener(new MyListener());
+ }
server0Service.stop();
- // Wait to be informed of failure
+ // Wait to be informed of failure
boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
@@ -427,19 +405,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
-
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -455,7 +420,7 @@
producer.send(message);
}
- fail(session, latch);
+ fail(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -506,19 +471,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
-
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -534,7 +486,7 @@
producer.send(message);
}
- fail(session, latch);
+ fail(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -593,18 +545,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -622,7 +562,7 @@
session.commit();
- fail(session, latch);
+ fail(session);
// committing again should work since didn't send anything since last commit
@@ -680,18 +620,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
// create a consumer and start the session before failover
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -717,7 +645,7 @@
Assert.assertFalse(session.isRollbackOnly());
- fail(session, latch);
+ fail(session);
session.commit();
@@ -775,18 +703,6 @@
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session1.addFailureListener(new MyListener());
-
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -823,7 +739,7 @@
message.acknowledge();
}
- fail(session2, latch);
+ fail(session2);
Assert.assertTrue(session2.isRollbackOnly());
@@ -864,18 +780,6 @@
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session1.addFailureListener(new MyListener());
-
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -916,7 +820,7 @@
consumer.close();
- fail(session2, latch);
+ fail(session2);
Assert.assertFalse(session2.isRollbackOnly());
@@ -967,18 +871,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -996,7 +888,7 @@
producer.send(message);
}
- fail(session, latch);
+ fail(session);
try
{
@@ -1043,18 +935,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1074,7 +954,7 @@
session.end(xid, XAResource.TMSUCCESS);
- fail(session, latch);
+ fail(session);
try
{
@@ -1122,18 +1002,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1155,7 +1023,7 @@
session.prepare(xid);
- fail(session, latch);
+ fail(session);
try
{
@@ -1202,18 +1070,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1237,7 +1093,7 @@
session.commit(xid, false);
- fail(session, latch);
+ fail(session);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1295,18 +1151,6 @@
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session1.addFailureListener(new MyListener());
-
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1347,7 +1191,7 @@
message.acknowledge();
}
- fail(session2, latch);
+ fail(session2);
try
{
@@ -1386,18 +1230,6 @@
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session1.addFailureListener(new MyListener());
-
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1440,17 +1272,8 @@
session2.end(xid, XAResource.TMSUCCESS);
- RemotingConnection conn = ((ClientSessionInternal)session2).getConnection();
+ fail(session2);
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
-
try
{
session2.prepare(xid);
@@ -1488,18 +1311,6 @@
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session1.addFailureListener(new MyListener());
-
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1544,7 +1355,7 @@
session2.prepare(xid);
- fail(session2, latch);
+ fail(session2);
try
{
@@ -1572,38 +1383,22 @@
{
ServerLocator locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnServerShutdown(true);
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sendAndConsume(sf, true);
- final CountDownLatch latch = new CountDownLatch(1);
+ fail(session);
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
- conn.addFailureListener(new MyListener());
-
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
-
session.close();
+ waitForBackup(5);
+
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- session = sendAndConsume(sf, false);
+ session = sendAndConsume(sf, true);
session.close();
@@ -1631,18 +1426,6 @@
Map<ClientSession, List<ClientConsumer>> sessionConsumerMap = new HashMap<ClientSession, List<ClientConsumer>>();
- class MyListener extends BaseListener
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- List<MyListener> listeners = new ArrayList<MyListener>();
-
for (int i = 0; i < numSessions; i++)
{
ClientSession session = sf.createSession(true, true);
@@ -1680,19 +1463,12 @@
producer.send(message);
}
- RemotingConnection conn = ((ClientSessionInternal)sendSession).getConnection();
+ Set<ClientSession> sessionSet = sessionConsumerMap.keySet();
+ ClientSession[] sessions = new ClientSession[sessionSet.size()];
+ sessionSet.toArray(sessions);
+ fail(sessions);
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- // Wait to be informed of failure
-
- for (MyListener listener : listeners)
- {
- boolean ok = listener.latch.await(1000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
- }
-
for (ClientSession session : sessionConsumerMap.keySet())
{
session.start();
@@ -1748,18 +1524,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1790,7 +1554,7 @@
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
}
- fail(session, latch);
+ fail(session);
for (int i = 0; i < numMessages; i++)
{
@@ -1834,18 +1598,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1876,7 +1628,7 @@
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
}
- fail(session, latch);
+ fail(session);
// Should get the same ones after failover since we didn't ack
@@ -1914,24 +1666,14 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sf.createSession(true, true, 0);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -1964,7 +1706,7 @@
message.acknowledge();
}
- fail(session, latch);
+ fail(session);
// Send some more
@@ -2045,18 +1787,7 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, durable);
}
- final CountDownLatch latch = new CountDownLatch(1);
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 100;
@@ -2065,7 +1796,7 @@
session.start();
- fail(session, latch);
+ fail(session);
for (int i = 0; i < numMessages; i++)
{
@@ -2103,35 +1834,23 @@
public void testForceBlockingReturn() throws Exception
{
ServerLocator locator = getServerLocator();
-
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setFailoverOnServerShutdown(true);
+ locator.setReconnectAttempts(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
// Add an interceptor to delay the send method so we can get time to cause failover before it returns
server0Service.getRemotingService().addInterceptor(new DelayInterceptor());
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
- locator.setFailoverOnServerShutdown(true);
- locator.setReconnectAttempts(-1);
+
final ClientSession session = sf.createSession(true, true, 0);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
class Sender extends Thread
@@ -2162,7 +1881,7 @@
Thread.sleep(500);
- fail(session, latch);
+ fail(session);
sender.join();
@@ -2195,18 +1914,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
final int numMessages = 100;
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -2281,7 +1988,7 @@
Thread.sleep(500);
- fail(session, latch);
+ fail(session);
committer.join();
@@ -2360,18 +2067,6 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener extends BaseListener
- {
- public void connectionFailed(final HornetQException me)
- {
- latch.countDown();
- }
- }
-
- session.addFailureListener(new MyListener());
-
final int numMessages = 100;
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -2430,7 +2125,7 @@
Thread.sleep(500);
- fail(session, latch);
+ fail(session);
committer.join();
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-31 16:01:47 UTC (rev 9617)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-08-31 16:09:43 UTC (rev 9618)
@@ -219,6 +219,32 @@
return sf;
}
+ protected void waitForBackup(long seconds)
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while(!server1Service.isInitialised())
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ if(server1Service.isInitialised())
+ {
+ break;
+ }
+ else if(System.currentTimeMillis() > (time + toWait))
+ {
+ fail("backup server never started");
+ }
+ }
+ System.out.println("FailoverTestBase.waitForBackup");
+ }
+
protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
{
if (live)
@@ -293,7 +319,7 @@
protected ServerLocatorInternal getServerLocator() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true));
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false));
return (ServerLocatorInternal) locator;
}
13 years, 7 months
JBoss hornetq SVN: r9617 - in branches/Branch_2_1: docs/user-manual/en and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-31 12:01:47 -0400 (Tue, 31 Aug 2010)
New Revision: 9617
Modified:
branches/Branch_2_1/docs/user-manual/en/embedding-hornetq.xml
branches/Branch_2_1/merge-activity.txt
Log:
merging -r9615:9616 from trunk
Modified: branches/Branch_2_1/docs/user-manual/en/embedding-hornetq.xml
===================================================================
--- branches/Branch_2_1/docs/user-manual/en/embedding-hornetq.xml 2010-08-31 15:46:14 UTC (rev 9616)
+++ branches/Branch_2_1/docs/user-manual/en/embedding-hornetq.xml 2010-08-31 16:01:47 UTC (rev 9617)
@@ -66,18 +66,18 @@
<para>You need to instantiate and start HornetQ server. The class <literal
>org.hornetq.api.core.server.HornetQ</literal> has a few static methods for creating
servers with common configurations.</para>
- <programlisting>import org.hornetq.api.core.server.HornetQ;
-import org.hornetq.core.server.HornetQServer;
+ <programlisting>import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+
...
-HornetQServer server = HornetQ.newHornetQServer(config);
+HornetQServer server = HornetQServers.newHornetQServer(config);
server.start();</programlisting>
<para>You also have the option of instantiating <literal>HornetQServerImpl</literal>
directly:</para>
- <programlisting>HornetQServer server =
- new HornetQServerImpl(config);
+ <programlisting>HornetQServer server = new HornetQServerImpl(config);
server.start();</programlisting>
</section>
<section>
Modified: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt 2010-08-31 15:46:14 UTC (rev 9616)
+++ branches/Branch_2_1/merge-activity.txt 2010-08-31 16:01:47 UTC (rev 9617)
@@ -9,3 +9,5 @@
There was also a manual copy of JournalImpl.java on this merge, since there was a minor change before that needed to be applied
- 30-aug-2010 - clebert - merge from trunk -r9590:9598
+
+- 31-aug-2010 - jmesnil - merge from trunk -r9615:9616
13 years, 7 months
JBoss hornetq SVN: r9616 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-31 11:46:14 -0400 (Tue, 31 Aug 2010)
New Revision: 9616
Modified:
trunk/docs/user-manual/en/embedding-hornetq.xml
Log:
doc update
* update code snippet to create a new HornetQServer
Modified: trunk/docs/user-manual/en/embedding-hornetq.xml
===================================================================
--- trunk/docs/user-manual/en/embedding-hornetq.xml 2010-08-31 15:22:50 UTC (rev 9615)
+++ trunk/docs/user-manual/en/embedding-hornetq.xml 2010-08-31 15:46:14 UTC (rev 9616)
@@ -66,18 +66,18 @@
<para>You need to instantiate and start HornetQ server. The class <literal
>org.hornetq.api.core.server.HornetQ</literal> has a few static methods for creating
servers with common configurations.</para>
- <programlisting>import org.hornetq.api.core.server.HornetQ;
-import org.hornetq.core.server.HornetQServer;
+ <programlisting>import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+
...
-HornetQServer server = HornetQ.newHornetQServer(config);
+HornetQServer server = HornetQServers.newHornetQServer(config);
server.start();</programlisting>
<para>You also have the option of instantiating <literal>HornetQServerImpl</literal>
directly:</para>
- <programlisting>HornetQServer server =
- new HornetQServerImpl(config);
+ <programlisting>HornetQServer server = new HornetQServerImpl(config);
server.start();</programlisting>
</section>
<section>
13 years, 7 months
JBoss hornetq SVN: r9615 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: protocol/core/impl and 2 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-31 11:22:50 -0400 (Tue, 31 Aug 2010)
New Revision: 9615
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
Clean server shutdown
* when the server shuts down, it sends a DISCONNECT packet to its connections with its node ID.
* ClientSessionFactory will handle the DISCONNECT, trigger connection failures *and after* will notify the server locator that the node is down
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -43,6 +43,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -1152,6 +1153,7 @@
if (type == PacketImpl.DISCONNECT)
{
+ final DisconnectMessage msg = (DisconnectMessage)packet;
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1160,6 +1162,10 @@
{
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
+ if (msg.getNodeID() != null)
+ {
+ serverLocator.notifyNodeDown(msg.getNodeID().toString());
+ }
}
});
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -579,12 +579,13 @@
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
}
+ /*
if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
}
-
+ */
retry = true;
}
else
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -90,6 +90,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -174,7 +175,7 @@
}
case DISCONNECT:
{
- packet = new PacketImpl(PacketImpl.DISCONNECT);
+ packet = new DisconnectMessage();
break;
}
case EXCEPTION:
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -31,6 +31,7 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -307,11 +308,8 @@
channel.flushConfirmations();
}
- if (nodeID != null)
- {
- channel0.send(new ClusterTopologyChangeMessage(nodeID.toString()));
- }
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ Packet disconnect = new DisconnectMessage(nodeID);
+ channel0.sendAndFlush(disconnect);
}
public long generateChannelID()
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ *
+ * A Ping
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class DisconnectMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private SimpleString nodeID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DisconnectMessage(final SimpleString nodeID)
+ {
+ super(PacketImpl.DISCONNECT);
+
+ this.nodeID = nodeID;
+ }
+
+ public DisconnectMessage()
+ {
+ super(PacketImpl.DISCONNECT);
+ }
+
+ // Public --------------------------------------------------------
+
+ public SimpleString getNodeID()
+ {
+ return nodeID;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeNullableSimpleString(nodeID);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ nodeID = buffer.readNullableSimpleString();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", nodeID=" + nodeID);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof DisconnectMessage == false)
+ {
+ return false;
+ }
+
+ DisconnectMessage r = (DisconnectMessage)other;
+
+ return super.equals(other) && nodeID.equals(r.nodeID);
+ }
+
+ @Override
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-31 12:14:53 UTC (rev 9614)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-31 15:22:50 UTC (rev 9615)
@@ -227,4 +227,52 @@
return serverLocator.createSessionFactory(connector);
}
+
+ @Override
+ public void connectionFailed(HornetQException me)
+ {
+ if (!session.isClosed())
+ {
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to clean up the session after a connection failure", e);
+ }
+ serverLocator.notifyNodeDown(targetNodeID);
+ if (serverLocator.getDiscoveryAddress() == null)
+ {
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ ClientSessionFactory sf = null;
+ do
+ {
+ try
+ {
+ sf = serverLocator.createSessionFactory(connector);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ continue;
+ }
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ while (sf == null);
+ }
+ });
+ }
+ }
+ super.connectionFailed(me);
+ }
}
13 years, 7 months
JBoss hornetq SVN: r9614 - branches/Branch_2_1/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-31 08:14:53 -0400 (Tue, 31 Aug 2010)
New Revision: 9614
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
typo: accidental commit
Modified: branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 08:38:19 UTC (rev 9613)
+++ branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 12:14:53 UTC (rev 9614)
@@ -1087,7 +1087,7 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
- final int ackBatchSize_xx) throws HornetQException
+ final int ackBatchSize) throws HornetQException
{
if (closed)
{
13 years, 7 months
JBoss hornetq SVN: r9613 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-31 04:38:19 -0400 (Tue, 31 Aug 2010)
New Revision: 9613
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
add tests to check topology updates on HA clients
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-08-31 02:31:32 UTC (rev 9612)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-08-31 08:38:19 UTC (rev 9613)
@@ -246,6 +246,8 @@
sf.close();
+ locator.close();
+
stopServers(0);
}
@@ -319,6 +321,8 @@
checkContains(new int[] {}, nodeIDs, nodes);
sf.close();
+
+ locator.close();
}
public void testStopNodes() throws Throwable
@@ -399,9 +403,70 @@
{
}
+
+ locator.close();
}
-
+ public void testMultipleClientSessionFactories() throws Throwable
+ {
+ startServers(0, 1, 2, 3, 4);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+ ServerLocator locator = createHAServerLocator();
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last,
+ int distance)
+ {
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ }
+ }
+ });
+
+ ClientSessionFactory[] sfs = new ClientSessionFactory[] {
+ locator.createSessionFactory(),
+ locator.createSessionFactory(),
+ locator.createSessionFactory(),
+ locator.createSessionFactory(),
+ locator.createSessionFactory() };
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ stopServers(0, 4, 2, 3, 1);
+ checkContains(new int[] { }, nodeIDs, nodes);
+
+ for (int i = 0; i < sfs.length; i++)
+ {
+ ClientSessionFactory sf = sfs[i];
+ sf.close();
+ }
+
+ locator.close();
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
13 years, 7 months
JBoss hornetq SVN: r9612 - in branches/Branch_2_1: docs/user-manual/zh and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-30 22:31:32 -0400 (Mon, 30 Aug 2010)
New Revision: 9612
Modified:
branches/Branch_2_1/docs/user-manual/zh/appserver-integration.xml
branches/Branch_2_1/docs/user-manual/zh/persistence.xml
branches/Branch_2_1/merge-activity.txt
branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_1/src/main/org/hornetq/jms/client/HornetQSession.java
branches/Branch_2_1/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
branches/Branch_2_1/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
Merging -r9590:9598 (including https://jira.jboss.org/browse/HORNETQ-495) from trunk
Modified: branches/Branch_2_1/docs/user-manual/zh/appserver-integration.xml
===================================================================
--- branches/Branch_2_1/docs/user-manual/zh/appserver-integration.xml 2010-08-30 14:18:38 UTC (rev 9611)
+++ branches/Branch_2_1/docs/user-manual/zh/appserver-integration.xml 2010-08-31 02:31:32 UTC (rev 9612)
@@ -627,6 +627,17 @@
<entry>Integer</entry>
<entry>线程池的大小</entry>
</row>
+ <row>
+ <entry>SetupAttempts</entry>
+ <entry>Integer</entry>
+ <entry>尝试建立JMS连接的次数(默认值是10。-1表示无限次进行尝试)。有时MDB在部署时相关的JMS资源还没有准备好,这时通过多次的
+ 尝试直到JMS资源连接上为止。只适用于内部(Inbound)连接的情况。</entry>
+ </row>
+ <row>
+ <entry>SetupInterval</entry>
+ <entry>Long</entry>
+ <entry>每两次相邻尝试之间的时间间隔,以毫秒为单位(默认值为2000毫秒)。只适用于内部(Inbound)连接的情况。</entry>
+ </row>
</tbody>
</tgroup>
</informaltable>
Modified: branches/Branch_2_1/docs/user-manual/zh/persistence.xml
===================================================================
--- branches/Branch_2_1/docs/user-manual/zh/persistence.xml 2010-08-30 14:18:38 UTC (rev 9611)
+++ branches/Branch_2_1/docs/user-manual/zh/persistence.xml 2010-08-31 02:31:32 UTC (rev 9612)
@@ -56,23 +56,32 @@
</listitem>
</itemizedlist>
<para>标准的HornetQ核心服务器使用了两种日志:</para>
- <itemizedlist>
+ <itemizedlist id="persistence.journallist">
<listitem>
<para>绑定日志</para>
<para>这个日志用来保存与绑定有关的数据。其中包括在HornetQ上部署的队列及其属性,还有ID序列计数器。 </para>
<para>绑定日志是一个NIO型日志。与消息日志相比它的呑吐量是比较低的。</para>
+ <para>这种日志文件的名字采用<literal>hornetq-bindings</literal>作为前缀。每个文件都有
+ <literal>bindings</literal>这样的扩展。文件大小是<literal
+ >1048576</literal>,它的位置在bindings文件夹下。</para>
</listitem>
<listitem>
<para>JMS日志</para>
<para>这个日志保存所有JMS相关的数据,包括JMS队列,话题及连接工厂,以及它们的JNDI绑定信息。</para>
<para>通过管理接口创建的JMS资源将被保存在这个日志中。但是通过配置文件配置的资源则不保存。只有使用JMS时JMS的日志
才被创建。</para>
+ <para>这种日志文件的名字采用<literal>hornetq-jms</literal>作为前缀。每个文件都有
+ <literal>jms</literal>这样的扩展。文件大小是<literal
+ >1048576</literal>,它的位置在bindings文件夹下。</para>
</listitem>
<listitem>
<para>消息日志</para>
<para>这个日志用来存贮所有消息相关的数据,包括消息本身和重复ID缓存。</para>
<para>默认情况下HornetQ总是优先使用AIO型日志。如果AIO型日志不可用(比如在非Linux平台上运行,或系统内核版本不同)
它将自动使用NIO型日志。</para>
+ <para>这种日志文件的名字采用<literal>hornetq-data</literal>。作为前缀。每个文件都有
+ <literal>hq</literal>作为扩展名。默认的文件大小是 <literal
+ >10485760</literal> (可配置)。文件保存在journal文件夹下。</para>
</listitem>
</itemizedlist>
<para>对于超大消息,Hornet将它们保存在消息日志之外的地方。详见<xref linkend="large-messages"/>.</para>
@@ -220,4 +229,35 @@
参数设为<literal>false</literal>即可。 </para>
<para>注意如果你将该参数设为 false来关闭持久化,就意味着所有的绑定数据、消息数据、超大消息数据、重复ID缓冲以及转移(paging)数据都将不会被持久。</para>
</section>
+ <section id="persistence.importexport">
+ <title>导入入/导出日志数据</title>
+ <para>有时你需要使用导入/导出工具来查看日志文件的记录。这个导入/导出工具类在hornetq-core.jar文件中。
+ 使用以下命令可以将日志文件导出为文本文件:</para>
+ <para><literal>java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal
+ <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
+ <FileOutput></literal></para>
+ <para>要将日志文件导入,使用下面的命令(注意你需要netty.jar):</para>
+ <para><literal>java -cp hornetq-core.jar:netty.jar org.hornetq.core.journal.impl.ImportJournal
+ <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
+ <FileInput></literal></para>
+ <itemizedlist>
+ <listitem>
+ <para>JournalDirectory:文件的位置,如./hornetq/data/journal</para>
+ </listitem>
+ <listitem>
+ <para>JournalPrefix: 日志文件的前缀。<link linkend="persistence.journallist">这里</link>有关于前缀的详细描述。</para>
+ </listitem>
+ <listitem>
+ <para>FileExtension: 文件的扩展名。详细讨论参见<link linkend="persistence.journallist">这里</link>。
+ </para>
+ </listitem>
+ <listitem>
+ <para>FileSize:日志文件的大小。详细讨论参见<link linkend="persistence.journallist">这里</link>。</para>
+ </listitem>
+ <listitem>
+ <para>FileOutput:输出的文本文件名。</para>
+ </listitem>
+ </itemizedlist>
+ </section>
+
</chapter>
Modified: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt 2010-08-30 14:18:38 UTC (rev 9611)
+++ branches/Branch_2_1/merge-activity.txt 2010-08-31 02:31:32 UTC (rev 9612)
@@ -7,3 +7,5 @@
- 24-aug-2010 - clebert - merge from trunk -r9588:9590
There was also a manual copy of JournalImpl.java on this merge, since there was a minor change before that needed to be applied
+
+- 30-aug-2010 - clebert - merge from trunk -r9590:9598
Modified: branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-30 14:18:38 UTC (rev 9611)
+++ branches/Branch_2_1/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-31 02:31:32 UTC (rev 9612)
@@ -1087,7 +1087,7 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
- final int ackBatchSize) throws HornetQException
+ final int ackBatchSize_xx) throws HornetQException
{
if (closed)
{
Modified: branches/Branch_2_1/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/jms/client/HornetQSession.java 2010-08-30 14:18:38 UTC (rev 9611)
+++ branches/Branch_2_1/src/main/org/hornetq/jms/client/HornetQSession.java 2010-08-31 02:31:32 UTC (rev 9612)
@@ -213,6 +213,11 @@
return ackMode;
}
+
+ public boolean isXA()
+ {
+ return xa;
+ }
public void commit() throws JMSException
{
Modified: branches/Branch_2_1/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java 2010-08-30 14:18:38 UTC (rev 9611)
+++ branches/Branch_2_1/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java 2010-08-31 02:31:32 UTC (rev 9612)
@@ -42,9 +42,9 @@
private final boolean transactedOrClientAck;
protected JMSMessageListenerWrapper(final HornetQSession session,
- final ClientConsumer consumer,
- final MessageListener listener,
- final int ackMode)
+ final ClientConsumer consumer,
+ final MessageListener listener,
+ final int ackMode)
{
this.session = session;
@@ -52,7 +52,7 @@
this.listener = listener;
- transactedOrClientAck = ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE;
+ transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
}
/**
Modified: branches/Branch_2_1/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-08-30 14:18:38 UTC (rev 9611)
+++ branches/Branch_2_1/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-08-31 02:31:32 UTC (rev 9612)
@@ -68,11 +68,16 @@
private boolean useLocalTx;
+ private boolean transacted;
+
private final int sessionNr;
private final TransactionManager tm;
- public HornetQMessageHandler(final HornetQActivation activation, final TransactionManager tm, final ClientSession session, final int sessionNr)
+ public HornetQMessageHandler(final HornetQActivation activation,
+ final TransactionManager tm,
+ final ClientSession session,
+ final int sessionNr)
{
this.activation = activation;
this.session = session;
@@ -96,14 +101,16 @@
{
String subscriptionName = spec.getSubscriptionName();
String clientID = spec.getClientID();
-
+
// Durable sub
if (clientID == null)
{
- throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName + " - client ID has not been set");
+ throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName +
+ " - client ID has not been set");
}
- SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID, subscriptionName));
+ SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID,
+ subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName);
@@ -123,9 +130,10 @@
SimpleString oldFilterString = subResponse.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null ||
- oldFilterString == null && selector != null ||
- (oldFilterString != null && selector != null &&
- !oldFilterString.toString().equals(selector));
+ oldFilterString == null &&
+ selector != null ||
+ (oldFilterString != null && selector != null && !oldFilterString.toString()
+ .equals(selector));
SimpleString oldTopicName = subResponse.getAddress();
@@ -155,7 +163,7 @@
}
else
{
- queueName = activation.getTopicTemporaryQueue();
+ queueName = activation.getTopicTemporaryQueue();
}
}
else
@@ -168,6 +176,7 @@
// Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
+ transacted = activation.isDeliveryTransacted();
if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
{
endpoint = endpointFactory.createEndpoint(session);
@@ -201,7 +210,7 @@
{
HornetQMessageHandler.log.debug("Error releasing endpoint " + endpoint, t);
}
-
+
try
{
consumer.close();
@@ -246,15 +255,28 @@
try
{
- if(activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
+ if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
{
tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
}
endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
beforeDelivery = true;
msg.doBeforeReceive();
+
+ //In the transacted case the message must be acked *before* onMessage is called
+
+ if (transacted)
+ {
+ message.acknowledge();
+ }
+
((MessageListener)endpoint).onMessage(msg);
- message.acknowledge();
+
+ if (!transacted)
+ {
+ message.acknowledge();
+ }
+
try
{
endpoint.afterDelivery();
13 years, 7 months
JBoss hornetq SVN: r9611 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/management/impl and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-30 10:18:38 -0400 (Mon, 30 Aug 2010)
New Revision: 9611
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
add tests to check topology updates on HA clients
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -579,6 +579,11 @@
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
}
+ if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
retry = true;
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -143,7 +143,14 @@
clearIO();
try
{
+ if (configuration.getStaticConnectors() == null)
+ {
+ return null;
+ }
+ else
+ {
return configuration.getStaticConnectors().toArray(new String[0]);
+ }
}
finally
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -68,7 +68,8 @@
interceptors,
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
.getExecutor()
- : null);
+ : null,
+ server.getNodeID());
Channel channel1 = rc.getChannel(1, -1);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -25,10 +25,12 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -88,6 +90,8 @@
private final Executor executor;
private volatile boolean executing;
+
+ private final SimpleString nodeID;
// Constructors
// ---------------------------------------------------------------------------------
@@ -99,7 +103,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, blockingCallTimeout, interceptors, true, null);
+ this(transportConnection, blockingCallTimeout, interceptors, true, null, null);
}
/*
@@ -107,17 +111,19 @@
*/
public RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
- final Executor executor)
+ final Executor executor,
+ final SimpleString nodeID)
{
- this(transportConnection, -1, interceptors, false, executor);
+ this(transportConnection, -1, interceptors, false, executor, nodeID);
}
private RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean client,
- final Executor executor)
+ final Executor executor,
+ final SimpleString nodeID)
{
this.transportConnection = transportConnection;
@@ -129,6 +135,8 @@
this.client = client;
this.executor = executor;
+
+ this.nodeID = nodeID;
}
// RemotingConnection implementation
@@ -299,6 +307,10 @@
channel.flushConfirmations();
}
+ if (nodeID != null)
+ {
+ channel0.send(new ClusterTopologyChangeMessage(nodeID.toString()));
+ }
channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-30 08:13:19 UTC (rev 9610)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -227,53 +227,4 @@
return serverLocator.createSessionFactory(connector);
}
-
- @Override
- public void connectionFailed(HornetQException me)
- {
- if (!session.isClosed())
- {
- try
- {
- session.cleanUp(false);
- }
- catch (Exception e)
- {
- log.warn("Unable to clean up the session after a connection failure", e);
- }
- serverLocator.notifyNodeDown(targetNodeID);
- if (serverLocator.getDiscoveryAddress() == null)
- {
- executor.execute(new Runnable()
- {
-
- public void run()
- {
- ClientSessionFactory sf = null;
- do
- {
- try
- {
- sf = serverLocator.createSessionFactory(connector);
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- continue;
- }
- }
- catch (Exception e)
- {
- break;
- }
- }
- while (sf == null);
- }
- });
- }
- }
- super.connectionFailed(me);
- }
-
}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyTest.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class HAClientTopologyTest extends TopologyClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(HAClientTopologyTest.class);
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
+ setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
+ setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
+ setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
+ setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
+ }
+
+ protected void setupServers() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+ setupServer(3, isFileStorage(), isNetty());
+ setupServer(4, isFileStorage(), isNetty());
+ }
+
+ @Override
+ protected ServerLocator createHAServerLocator()
+ {
+ TransportConfiguration tc = createTransportConfiguration(isNetty(), false, generateParams(0, isNetty()));
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(tc);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ return locator;
+ }
+}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class HAClientTopologyWithDiscoveryTest extends TopologyClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(HAClientTopologyWithDiscoveryTest.class);
+
+ protected static final String groupAddress = "230.1.2.3";
+
+ protected static final int groupPort = 6745;
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster3", 3, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupDiscoveryClusterConnection("cluster4", 4, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ }
+
+ protected void setupServers() throws Exception
+ {
+ setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty(), false);
+ }
+
+ @Override
+ protected ServerLocator createHAServerLocator()
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(groupAddress, groupPort);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ return locator;
+ }
+}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyTest.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+/**
+ * A NettyHAClientTopologyTest
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class NettyHAClientTopologyTest extends HAClientTopologyTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+/**
+ * A NettyHAClientTopologyWithDiscoveryTest
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWithDiscoveryTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-08-30 14:18:38 UTC (rev 9611)
@@ -0,0 +1,409 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.topology;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A TopologyClusterTestBase
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public abstract class TopologyClusterTestBase extends ClusterTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(TopologyClusterTestBase.class);
+
+ private static final long WAIT_TIMEOUT = 30000;
+
+ abstract protected ServerLocator createHAServerLocator();
+
+ abstract protected void setupServers() throws Exception;
+
+ abstract protected void setupCluster() throws Exception;
+
+ abstract protected boolean isNetty() throws Exception;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServers();
+
+ setupCluster();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers(0, 1, 2, 3, 4);
+
+ super.tearDown();
+ }
+
+ /**
+ * Check that the actual list of received nodeIDs correspond to the expected order of nodes
+ */
+ protected void checkOrder(int[] expected, String[] nodeIDs, List<String> actual)
+ {
+ assertEquals(expected.length, actual.size());
+ for (int i = 0; i < expected.length; i++)
+ {
+ assertEquals("did not receive expected nodeID at " + i, nodeIDs[expected[i]], actual.get(i));
+ }
+ }
+
+ protected void checkContains(int[] expected, String[] nodeIDs, List<String> actual)
+ {
+ long start = System.currentTimeMillis();
+ do
+ {
+ if (expected.length != actual.size())
+ {
+ continue;
+ }
+ boolean ok = true;
+ for (int i = 0; i < expected.length; i++)
+ {
+ ok = (ok && actual.contains(nodeIDs[expected[i]]));
+ }
+ if (ok)
+ {
+ return;
+ }
+ } while(System.currentTimeMillis() - start < 5000);
+ fail("did not contain all expected node ID: " + actual);
+ }
+
+ protected String[] getNodeIDs(int... nodes)
+ {
+ String[] nodeIDs = new String[nodes.length];
+ for (int i = 0; i < nodes.length; i++)
+ {
+ nodeIDs[i] = servers[i].getNodeID().toString();
+ }
+ return nodeIDs;
+ }
+
+ protected ClientSession checkSessionOrReconnect(ClientSession session, ServerLocator locator) throws Exception
+ {
+ try
+ {
+ String rand = RandomUtil.randomString();
+ session.createQueue(rand, rand);
+ session.deleteQueue(rand);
+ return session;
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.OBJECT_CLOSED || e.getCode() == HornetQException.UNBLOCKED)
+ {
+ ClientSessionFactory sf = locator.createSessionFactory();
+ return sf.createSession();
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ protected void waitForClusterConnections(final int node, final int count) throws Exception
+ {
+ HornetQServer server = servers[node];
+
+ if (server == null)
+ {
+ throw new IllegalArgumentException("No server at " + node);
+ }
+
+ ClusterManager clusterManager = server.getClusterManager();
+
+ long start = System.currentTimeMillis();
+
+ do
+ {
+ int nodesCount = 0;
+
+ for (ClusterConnection clusterConn : clusterManager.getClusterConnections())
+ {
+ nodesCount += clusterConn.getNodes().size();
+ }
+
+ if (nodesCount == count)
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+
+ log.error(clusterDescription(servers[node]));
+ throw new IllegalStateException("Timed out waiting for cluster connections ");
+ }
+
+ public void testReceiveNotificationsWhenOtherNodesAreStartedAndStopped() throws Throwable
+ {
+ startServers(0);
+
+ ServerLocator locator = createHAServerLocator();
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last,
+ int distance)
+ {
+ if(!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
+ }
+ });
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ startServers(1, 4, 3, 2);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
+
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ stopServers(2, 3, 1, 4);
+
+ waitForClusterConnections(0, 0);
+
+ assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+ checkContains(new int[] { 0 }, nodeIDs, nodes);
+
+ sf.close();
+
+ stopServers(0);
+ }
+
+ public void testReceiveNotifications() throws Throwable
+ {
+ startServers(0, 1, 2, 3, 4);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+ ServerLocator locator = createHAServerLocator();
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last,
+ int distance)
+ {
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
+ }
+ });
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ ClientSession session = sf.createSession();
+
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ stopServers(0);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(2);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(4);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+
+ stopServers(3);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
+
+ stopServers(1);
+
+ assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+ checkContains(new int[] {}, nodeIDs, nodes);
+
+ sf.close();
+ }
+
+ public void testStopNodes() throws Throwable
+ {
+ startServers(0, 1, 2, 3, 4);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+
+ ServerLocator locator = createHAServerLocator();
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
+ {
+ public void nodeUP(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last,
+ int distance)
+ {
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ }
+
+ public void nodeDown(String nodeID)
+ {
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ }
+ }
+ });
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ ClientSession session = sf.createSession();
+
+ stopServers(0);
+ assertFalse(servers[0].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(2);
+ assertFalse(servers[2].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+
+ stopServers(4);
+ assertFalse(servers[4].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+
+ stopServers(3);
+ assertFalse(servers[3].isStarted());
+
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
+
+ stopServers(1);
+ assertFalse(servers[1].isStarted());
+ try
+ {
+ session = checkSessionOrReconnect(session, locator);
+ fail();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
13 years, 7 months