Author: clebert.suconic(a)jboss.com
Date: 2010-09-02 22:05:02 -0400 (Thu, 02 Sep 2010)
New Revision: 9634
Added:
branches/Branch_2_1/examples/soak/tx-restarts/
branches/Branch_2_1/examples/soak/tx-restarts/README
branches/Branch_2_1/examples/soak/tx-restarts/build.bat
branches/Branch_2_1/examples/soak/tx-restarts/build.sh
branches/Branch_2_1/examples/soak/tx-restarts/build.xml
branches/Branch_2_1/examples/soak/tx-restarts/server0/
branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml
branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties
branches/Branch_2_1/examples/soak/tx-restarts/src/
branches/Branch_2_1/examples/soak/tx-restarts/src/org/
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
Log:
Adding soak test to validate journal restart on transactions
Property changes on: branches/Branch_2_1/examples/soak/tx-restarts
___________________________________________________________________
Name: svn:ignore
+ build
Added: branches/Branch_2_1/examples/soak/tx-restarts/README
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/README (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/README 2010-09-03 02:05:02 UTC (rev
9634)
@@ -0,0 +1,20 @@
+****************************************************
+* Soak Test For TX survival over restarts
+****************************************************
+
+Run The Test
+==============
+
+To run the test simply use ./build.sh
+
+It's important that you always clean the data directory before starting the test, as
it will validate for sequences generated.
+
+The test will start and stop a server multiple times.
+
+
+Run the server remotely
+=======================
+
+You can start the server directly if you want, you can just start the server as:
+
+
Added: branches/Branch_2_1/examples/soak/tx-restarts/build.bat
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/build.bat (rev
0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/build.bat 2010-09-03 02:05:02 UTC (rev
9634)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: branches/Branch_2_1/examples/soak/tx-restarts/build.sh
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/build.sh (rev
0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/build.sh 2010-09-03 02:05:02 UTC (rev
9634)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Property changes on: branches/Branch_2_1/examples/soak/tx-restarts/build.sh
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_2_1/examples/soak/tx-restarts/build.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/build.xml (rev
0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/build.xml 2010-09-03 02:05:02 UTC (rev
9634)
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project default="run" name="TX-Restarts soak test">
+
+ <import file="../../common/build.xml"/>
+ <property file="ant.properties"/>
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.TXRestartSoak"/>
+
+ <param name="java-min-memory" value="1G"/>
+ <param name="java-max-memory" value="1G"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.TXRestartSoak"/>
+ <param name="hornetq.example.runServer"
value="false"/>
+ </antcall>
+ </target>
+
+</project>
Property changes on: branches/Branch_2_1/examples/soak/tx-restarts/server0
___________________________________________________________________
Name: svn:ignore
+ data
Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties
(rev 0)
+++
branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml
(rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager"
class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
(rev 0)
+++
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,62 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ </acceptors>
+
+ <address-settings>
+ <address-setting match="jms.queue.#">
+ <max-delivery-attempts>-1</max-delivery-attempts>
+ <!-- <max-size-bytes>335544320000</max-size-bytes> -->
+ <max-size-bytes>33554432</max-size-bytes>
+ <page-size-bytes>16777216</page-size-bytes>
+ <address-full-policy>PAGE</address-full-policy>
+ </address-setting>
+
+ </address-settings>
+
+
+ <diverts>
+ <divert name="div1">
+ <address>jms.queue.inputQueue</address>
+ <forwarding-address>jms.queue.diverted1</forwarding-address>
+ <exclusive>true</exclusive>
+ </divert>
+
+ <divert name="div2">
+ <address>jms.queue.inputQueue</address>
+ <forwarding-address>jms.queue.diverted2</forwarding-address>
+ <exclusive>true</exclusive>
+ </divert>
+ </diverts>
+
+
+
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.#">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createNonDurableQueue"
roles="guest"/>
+ <permission type="deleteNonDurableQueue"
roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml
(rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,27 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <min-large-message-size>100240</min-large-message-size>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="inputQueue">
+ <entry name="/queue/inputQueue"/>
+ </queue>
+
+ <queue name="diverted1">
+ <entry name="/queue/diverted1"/>
+ </queue>
+
+ <queue name="diverted2">
+ <entry name="/queue/diverted2"/>
+ </queue>
+</configuration>
Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml
(rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties
(rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,2 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
\ No newline at end of file
Added:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
(rev 0)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose
of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public abstract class ClientAbstract extends Thread
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ClientAbstract.class.getName());
+
+ // Attributes ----------------------------------------------------
+
+ protected InitialContext ctx;
+
+ protected XAConnection conn;
+
+ protected XASession sess;
+
+ protected XAResource activeXAResource;
+
+ protected Xid activeXid;
+
+ protected volatile boolean running = true;
+
+ protected volatile int errors = 0;
+
+ /**
+ * A commit was called
+ * case we don't find the Xid, means it was accepted
+ */
+ protected volatile boolean pendingCommit = false;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected InitialContext getContext(final int serverId) throws Exception
+ {
+ String jndiFilename = "server" + serverId +
"/client-jndi.properties";
+ File jndiFile = new File(jndiFilename);
+ Properties props = new Properties();
+ FileInputStream inStream = null;
+ try
+ {
+ inStream = new FileInputStream(jndiFile);
+ props.load(inStream);
+ }
+ finally
+ {
+ if (inStream != null)
+ {
+ inStream.close();
+ }
+ }
+ return new InitialContext(props);
+
+ }
+
+ public XAConnection getConnection()
+ {
+ return conn;
+ }
+
+ public int getErrorsCount()
+ {
+ return errors;
+ }
+
+ public final void connect()
+ {
+ while (running)
+ {
+ try
+ {
+ disconnect();
+
+ ctx = getContext(0);
+
+ XAConnectionFactory cf =
(XAConnectionFactory)ctx.lookup("/ConnectionFactory");
+
+ conn = cf.createXAConnection();
+
+ sess = conn.createXASession();
+
+ activeXAResource = sess.getXAResource();
+
+ if (activeXid != null)
+ {
+ synchronized (ClientAbstract.class)
+ {
+ Xid[] xids = activeXAResource.recover(XAResource.TMSTARTRSCAN);
+ boolean found = false;
+ for (Xid recXid : xids)
+ {
+ if (recXid.equals(activeXid))
+ {
+ // System.out.println("Calling commit after a prepare on
" + this);
+ found = true;
+ callCommit();
+ }
+ }
+
+ if (!found)
+ {
+ if (pendingCommit)
+ {
+ System.out.println("Doing a commit based on a pending commit
on " + this);
+ onCommit();
+ }
+ else
+ {
+ System.out.println("Doing a rollback on " + this);
+ onRollback();
+ }
+
+ activeXid = null;
+ pendingCommit = false;
+ }
+ }
+ }
+
+ connectClients();
+
+ break;
+ }
+ catch (Exception e)
+ {
+ ClientAbstract.log.warning("Can't connect to server,
retrying");
+ disconnect();
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignored)
+ {
+ // if an interruption was sent, we will respect it and leave the loop
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ connect();
+ }
+
+ protected void callCommit() throws Exception
+ {
+ pendingCommit = true;
+ activeXAResource.commit(activeXid, false);
+ pendingCommit = false;
+ activeXid = null;
+ onCommit();
+ }
+
+ protected void callPrepare() throws Exception
+ {
+ activeXAResource.prepare(activeXid);
+ }
+
+ public void beginTX() throws Exception
+ {
+ activeXid = newXID();
+
+ activeXAResource.start(activeXid, XAResource.TMNOFLAGS);
+ }
+
+ public void endTX() throws Exception
+ {
+ activeXAResource.end(activeXid, XAResource.TMSUCCESS);
+ callPrepare();
+ callCommit();
+ }
+
+ public void setRunning(final boolean running)
+ {
+ this.running = running;
+ }
+
+ /**
+ * @return
+ */
+ private XidImpl newXID()
+ {
+ return new XidImpl("tst".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
+ protected abstract void connectClients() throws Exception;
+
+ protected abstract void onCommit();
+
+ protected abstract void onRollback();
+
+ public void disconnect()
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ try
+ {
+ if (ctx != null)
+ {
+ ctx.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ ctx = null;
+ conn = null;
+ // it's not necessary to close the session as conn.close() will already take
care of that
+ sess = null;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
(rev 0)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+
+/**
+ * A Receiver
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class Receiver extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private Queue queue;
+
+ private final String queueJNDI;
+
+ protected volatile long msgs = 0;
+
+ protected volatile long pendingMsgs = 0;
+
+ protected MessageConsumer cons;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public Receiver(String queueJNDI)
+ {
+ super();
+ this.queueJNDI = queueJNDI;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void run()
+ {
+ super.run();
+
+ while (running)
+ {
+ try
+ {
+ beginTX();
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ Message msg = cons.receive(5000);
+ if (msg == null)
+ {
+ break;
+ }
+
+ if (msg.getLongProperty("count") != msgs + pendingMsgs)
+ {
+ errors++;
+ System.out.println("count should be " + (msgs + pendingMsgs)
+ " when it was " + msg.getLongProperty("count") + " on " +
queueJNDI);
+ }
+
+ pendingMsgs++;
+
+ }
+
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#connectClients()
+ */
+ @Override
+ protected void connectClients() throws Exception
+ {
+
+ queue = (Queue)ctx.lookup(queueJNDI);
+
+ cons = sess.createConsumer(queue);
+
+ conn.start();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ msgs += pendingMsgs;
+ pendingMsgs = 0;
+ System.out.println("Commit on consumer " + queueJNDI + ",
msgs=" + msgs);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ System.out.println("Rollback on consumer " + queueJNDI + ",
msgs=" + msgs);
+ pendingMsgs = 0;
+ }
+
+ public String toString()
+ {
+ return "Receiver::" + this.queueJNDI + ", msgs=" + msgs +
", pending=" + pendingMsgs;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
(rev 0)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+
+
+/**
+ * A Sender
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class Sender extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected MessageProducer producer;
+ protected Queue queue;
+
+ protected long msgs = TXRestartSoak.MIN_MESSAGES_ON_QUEUE;
+ protected long pendingMsgs = 0;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public Sender()
+ {
+ }
+
+ @Override
+ protected void connectClients() throws Exception
+ {
+ queue = (Queue)ctx.lookup("/queue/inputQueue");
+ producer = sess.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+
+ public void run()
+ {
+ super.run();
+ while (running)
+ {
+ try
+ {
+ beginTX();
+ for (int i = 0 ; i < 1000; i++)
+ {
+ BytesMessage msg = sess.createBytesMessage();
+ msg.setLongProperty("count", pendingMsgs + msgs);
+ msg.writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ pendingMsgs++;
+ }
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ this.msgs += pendingMsgs;
+ pendingMsgs = 0;
+ System.out.println("commit on sender msgs = " + msgs );
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ pendingMsgs = 0;
+ System.out.println("Rolled back msgs=" + msgs);
+ }
+
+ public String toString()
+ {
+ return "Sender, msgs=" + msgs + ", pending=" + pendingMsgs;
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
(rev 0)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java 2010-09-03
02:05:02 UTC (rev 9634)
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+
+/**
+ *
+ * This is used as a soak test to verify HornetQ's capability of persistent messages
over restarts.
+ *
+ * This is used as a smoke test before releases.
+ *
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose
of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class TXRestartSoak extends HornetQExample
+{
+
+ public static final int MIN_MESSAGES_ON_QUEUE = 50000;
+
+ private static final Logger log = Logger.getLogger(TXRestartSoak.class.getName());
+
+ public static void main(final String[] args)
+ {
+ new TXRestartSoak().run(args);
+ }
+
+ private TXRestartSoak()
+ {
+ super();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.common.example.HornetQExample#runExample()
+ */
+ @Override
+ public boolean runExample() throws Exception
+ {
+
+ Connection connection = null;
+ InitialContext initialContext = null;
+
+ try
+ {
+ // Step 1. Create an initial context to perform the JNDI lookup.
+ initialContext = getContext(0);
+
+ ConnectionFactory cf =
(ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ // Step 4. Create the JMS objects
+ connection = cf.createConnection();
+
+ // Step 2. Perfom a lookup on the queue
+ Queue queue = (Queue)initialContext.lookup("/queue/inputQueue");
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ for (int i = 0 ; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ BytesMessage msg = session.createBytesMessage();
+ msg.setLongProperty("count", i);
+ msg.writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i + " messages");
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ Sender send = new Sender();
+
+ send.start();
+
+ Receiver rec1 = new Receiver("/queue/diverted1");
+ rec1.start();
+ Receiver rec2 = new Receiver("/queue/diverted2");
+ rec2.start();
+
+
+ if (runServer)
+ {
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+ while (timeEnd > System.currentTimeMillis())
+ {
+ System.out.println("Letting the service run for 20 seconds");
+ Thread.sleep(TimeUnit.SECONDS.toMillis(20));
+ stopServers();
+
+ Thread.sleep(10000);
+
+ boolean disconnected = false;
+
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 ||
rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the
clients, please look at the logs");
+ break;
+ }
+
+ while (!disconnected)
+ {
+ disconnected = send.getConnection() == null &&
rec1.getConnection() == null && rec2.getConnection() == null;
+ if (!disconnected)
+ {
+ System.out.println("NOT ALL THE CLIENTS ARE DISCONNECTED, NEED
TO WAIT THEM");
+ }
+ Thread.sleep(1000);
+ }
+
+ startServers();
+ }
+ }
+ else
+ {
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+ while (timeEnd > System.currentTimeMillis())
+ {
+
+ }
+ }
+
+ send.setRunning(false);
+ rec1.setRunning(false);
+ rec2.setRunning(false);
+
+ send.join();
+ rec1.join();
+ rec2.join();
+
+ return send.getErrorsCount() == 0 && rec1.getErrorsCount() == 0
&& rec2.getErrorsCount() == 0;
+
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ }
+}