[hornetq-commits] JBoss hornetq SVN: r9634 - in branches/Branch_2_1/examples/soak: tx-restarts and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 2 22:05:03 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-09-02 22:05:02 -0400 (Thu, 02 Sep 2010)
New Revision: 9634

Added:
   branches/Branch_2_1/examples/soak/tx-restarts/
   branches/Branch_2_1/examples/soak/tx-restarts/README
   branches/Branch_2_1/examples/soak/tx-restarts/build.bat
   branches/Branch_2_1/examples/soak/tx-restarts/build.sh
   branches/Branch_2_1/examples/soak/tx-restarts/build.xml
   branches/Branch_2_1/examples/soak/tx-restarts/server0/
   branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties
   branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml
   branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
   branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml
   branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml
   branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties
   branches/Branch_2_1/examples/soak/tx-restarts/src/
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
   branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
Log:
Adding soak test to validate journal restart on transactions


Property changes on: branches/Branch_2_1/examples/soak/tx-restarts
___________________________________________________________________
Name: svn:ignore
   + build


Added: branches/Branch_2_1/examples/soak/tx-restarts/README
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/README	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/README	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,20 @@
+****************************************************
+* Soak Test For TX survival over restarts
+****************************************************
+
+Run The Test
+==============
+
+To run the test simply use ./build.sh
+
+It's important that you always clean the data directory before starting the test, as it will validate for sequences generated.
+
+The test will start and stop a server multiple times.
+
+
+Run the server remotely
+=======================
+
+You can start the server directly if you want, you can just start the server as:
+
+

Added: branches/Branch_2_1/examples/soak/tx-restarts/build.bat
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/build.bat	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/build.bat	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,13 @@
+ at echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+   rem running from TRUNK
+   call ..\..\..\src\bin\build.bat %*
+) else (
+   rem running from the distro
+   call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="

Added: branches/Branch_2_1/examples/soak/tx-restarts/build.sh
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/build.sh	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/build.sh	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+   # running from TRUNK
+   ../../../src/bin/build.sh "$@"
+else
+   # running from the distro
+   ../../../bin/build.sh "$@"
+fi
+
+
+


Property changes on: branches/Branch_2_1/examples/soak/tx-restarts/build.sh
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_2_1/examples/soak/tx-restarts/build.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/build.xml	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/build.xml	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+      <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+      ]>
+<!--
+  ~ Copyright 2009 Red Hat, Inc.
+  ~ Red Hat licenses this file to you under the Apache License, version
+  ~ 2.0 (the "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+  ~ implied.  See the License for the specific language governing
+  ~ permissions and limitations under the License.
+  -->
+
+<project default="run" name="TX-Restarts soak test">
+
+   <import file="../../common/build.xml"/>
+   <property file="ant.properties"/>
+   <target name="run">   	
+      <antcall target="runExample">
+         <param name="example.classname" value="org.hornetq.jms.example.TXRestartSoak"/>
+         
+     	   <param name="java-min-memory" value="1G"/>
+     	   <param name="java-max-memory" value="1G"/>
+      </antcall>
+   </target>
+
+   <target name="runRemote">
+      <antcall target="runExample">
+         <param name="example.classname" value="org.hornetq.jms.example.TXRestartSoak"/>
+         <param name="hornetq.example.runServer" value="false"/>
+      </antcall>
+   </target>
+   
+</project>


Property changes on: branches/Branch_2_1/examples/soak/tx-restarts/server0
___________________________________________________________________
Name: svn:ignore
   + data


Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/client-jndi.properties	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-beans.xml	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+   <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+   <!-- JNDI server. Disable this if you don't want JNDI -->
+   <bean name="JNDIServer" class="org.jnp.server.Main">
+      <property name="namingInfo">
+         <inject bean="Naming"/>
+      </property>
+      <property name="port">1099</property>
+      <property name="bindAddress">localhost</property>
+      <property name="rmiPort">1098</property>
+      <property name="rmiBindAddress">localhost</property>
+   </bean>
+   
+   <!-- MBean server -->
+   <bean name="MBeanServer" class="javax.management.MBeanServer">
+      <constructor factoryClass="java.lang.management.ManagementFactory"
+                   factoryMethod="getPlatformMBeanServer"/>
+   </bean> 
+
+   <!-- The core configuration -->
+   <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+   <!-- The security manager -->
+   <bean name="HornetQSecurityManager" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+
+   <!-- The core server -->
+   <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+      <constructor>
+         <parameter>
+            <inject bean="Configuration"/>
+         </parameter>
+         <parameter>
+            <inject bean="MBeanServer"/>
+         </parameter>
+         <parameter>
+            <inject bean="HornetQSecurityManager"/>
+         </parameter>        
+      </constructor>
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+   
+   <!-- The JMS server -->
+   <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+      <constructor>         
+         <parameter>
+            <inject bean="HornetQServer"/>
+         </parameter>
+      </constructor>
+   </bean>
+
+</deployment>

Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,62 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+   <!-- Connectors -->
+   <connectors>
+      <connector name="netty-connector">
+         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>         
+      </connector>
+   </connectors>
+   
+   <!-- Acceptors -->
+   <acceptors>
+      <acceptor name="netty-acceptor">
+         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>         
+      </acceptor>
+   </acceptors>
+
+   <address-settings>
+       <address-setting match="jms.queue.#">
+          <max-delivery-attempts>-1</max-delivery-attempts>
+          <!-- <max-size-bytes>335544320000</max-size-bytes> -->
+          <max-size-bytes>33554432</max-size-bytes>
+          <page-size-bytes>16777216</page-size-bytes>
+          <address-full-policy>PAGE</address-full-policy>
+       </address-setting>
+
+   </address-settings>
+
+
+   <diverts>
+      <divert name="div1">
+         <address>jms.queue.inputQueue</address>
+         <forwarding-address>jms.queue.diverted1</forwarding-address>
+         <exclusive>true</exclusive>
+      </divert>
+
+      <divert name="div2">
+         <address>jms.queue.inputQueue</address>
+         <forwarding-address>jms.queue.diverted2</forwarding-address>
+         <exclusive>true</exclusive>
+      </divert>
+   </diverts>
+
+
+
+
+   <!-- Other config -->
+
+   <security-settings>
+      <!--security for example queue-->
+      <security-setting match="jms.queue.#">
+         <permission type="createDurableQueue" roles="guest"/>
+         <permission type="deleteDurableQueue" roles="guest"/>
+         <permission type="createNonDurableQueue" roles="guest"/>
+         <permission type="deleteNonDurableQueue" roles="guest"/>
+         <permission type="consume" roles="guest"/>
+         <permission type="send" roles="guest"/>
+      </security-setting>
+   </security-settings>
+
+</configuration>

Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-jms.xml	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,27 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+   <!--the connection factory used by the example-->
+   <connection-factory name="ConnectionFactory">
+      <connectors>
+         <connector-ref connector-name="netty-connector"/>
+      </connectors>
+      <min-large-message-size>100240</min-large-message-size>
+      <entries>
+         <entry name="ConnectionFactory"/>
+      </entries>
+   </connection-factory>
+
+   <!--the queue used by the example-->
+   <queue name="inputQueue">
+      <entry name="/queue/inputQueue"/>
+   </queue>
+
+   <queue name="diverted1">
+      <entry name="/queue/diverted1"/>
+   </queue>
+
+   <queue name="diverted2">
+      <entry name="/queue/diverted2"/>
+   </queue>
+</configuration>

Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-users.xml	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+   <!-- the default user.  this is used where username is null-->
+   <defaultuser name="guest" password="guest">
+      <role name="guest"/>
+   </defaultuser>
+</configuration>
\ No newline at end of file

Added: branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/server0/jndi.properties	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,2 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
\ No newline at end of file

Added: branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * WARNING: This is not a sample on how you should handle XA.
+ *          You are supposed to use a TransactionManager.
+ *          This class is doing the job of a TransactionManager that fits for the purpose of this test only,
+ *          however there are many more pitfalls to deal with Transactions. 
+ *          
+ *          This is just to stress and soak test Transactions with HornetQ. 
+ *          
+ *          And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class ClientAbstract extends Thread
+{
+
+   // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(ClientAbstract.class.getName());
+
+   // Attributes ----------------------------------------------------
+
+   protected InitialContext ctx;
+
+   protected XAConnection conn;
+
+   protected XASession sess;
+
+   protected XAResource activeXAResource;
+
+   protected Xid activeXid;
+
+   protected volatile boolean running = true;
+
+   protected volatile int errors = 0;
+
+   /**
+    * A commit was called
+    * case we don't find the Xid, means it was accepted
+    */
+   protected volatile boolean pendingCommit = false;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   protected InitialContext getContext(final int serverId) throws Exception
+   {
+      String jndiFilename = "server" + serverId + "/client-jndi.properties";
+      File jndiFile = new File(jndiFilename);
+      Properties props = new Properties();
+      FileInputStream inStream = null;
+      try
+      {
+         inStream = new FileInputStream(jndiFile);
+         props.load(inStream);
+      }
+      finally
+      {
+         if (inStream != null)
+         {
+            inStream.close();
+         }
+      }
+      return new InitialContext(props);
+
+   }
+
+   public XAConnection getConnection()
+   {
+      return conn;
+   }
+
+   public int getErrorsCount()
+   {
+      return errors;
+   }
+
+   public final void connect()
+   {
+      while (running)
+      {
+         try
+         {
+            disconnect();
+
+            ctx = getContext(0);
+
+            XAConnectionFactory cf = (XAConnectionFactory)ctx.lookup("/ConnectionFactory");
+
+            conn = cf.createXAConnection();
+
+            sess = conn.createXASession();
+
+            activeXAResource = sess.getXAResource();
+
+            if (activeXid != null)
+            {
+               synchronized (ClientAbstract.class)
+               {
+                  Xid[] xids = activeXAResource.recover(XAResource.TMSTARTRSCAN);
+                  boolean found = false;
+                  for (Xid recXid : xids)
+                  {
+                     if (recXid.equals(activeXid))
+                     {
+                        // System.out.println("Calling commit after a prepare on " + this);
+                        found = true;
+                        callCommit();
+                     }
+                  }
+
+                  if (!found)
+                  {
+                     if (pendingCommit)
+                     {
+                        System.out.println("Doing a commit based on a pending commit on " + this);
+                        onCommit();
+                     }
+                     else
+                     {
+                        System.out.println("Doing a rollback on " + this);
+                        onRollback();
+                     }
+
+                     activeXid = null;
+                     pendingCommit = false;
+                  }
+               }
+            }
+
+            connectClients();
+
+            break;
+         }
+         catch (Exception e)
+         {
+            ClientAbstract.log.warning("Can't connect to server, retrying");
+            disconnect();
+            try
+            {
+               Thread.sleep(1000);
+            }
+            catch (InterruptedException ignored)
+            {
+               // if an interruption was sent, we will respect it and leave the loop
+               break;
+            }
+         }
+      }
+   }
+
+   @Override
+   public void run()
+   {
+      connect();
+   }
+
+   protected void callCommit() throws Exception
+   {
+      pendingCommit = true;
+      activeXAResource.commit(activeXid, false);
+      pendingCommit = false;
+      activeXid = null;
+      onCommit();
+   }
+
+   protected void callPrepare() throws Exception
+   {
+      activeXAResource.prepare(activeXid);
+   }
+
+   public void beginTX() throws Exception
+   {
+      activeXid = newXID();
+
+      activeXAResource.start(activeXid, XAResource.TMNOFLAGS);
+   }
+
+   public void endTX() throws Exception
+   {
+      activeXAResource.end(activeXid, XAResource.TMSUCCESS);
+      callPrepare();
+      callCommit();
+   }
+
+   public void setRunning(final boolean running)
+   {
+      this.running = running;
+   }
+
+   /**
+    * @return
+    */
+   private XidImpl newXID()
+   {
+      return new XidImpl("tst".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+   }
+
+   protected abstract void connectClients() throws Exception;
+
+   protected abstract void onCommit();
+
+   protected abstract void onRollback();
+
+   public void disconnect()
+   {
+      try
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+      catch (Exception ignored)
+      {
+         ignored.printStackTrace();
+      }
+
+      try
+      {
+         if (ctx != null)
+         {
+            ctx.close();
+         }
+      }
+      catch (Exception ignored)
+      {
+         ignored.printStackTrace();
+      }
+
+      ctx = null;
+      conn = null;
+      // it's not necessary to close the session as conn.close() will already take care of that
+      sess = null;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+
+/**
+ * A Receiver
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Receiver extends ClientAbstract
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   
+   private Queue queue;
+   
+   private final String queueJNDI;
+   
+   protected volatile long msgs = 0;
+   
+   protected volatile long pendingMsgs = 0;
+   
+   protected MessageConsumer cons;
+
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public Receiver(String queueJNDI)
+   {
+      super();
+      this.queueJNDI = queueJNDI;
+   }
+   
+   // Public --------------------------------------------------------
+
+   public void run()
+   {
+      super.run();
+      
+      while (running)
+      {
+         try
+         {
+            beginTX();
+            
+            for (int i = 0 ; i < 1000; i++)
+            {
+               Message msg = cons.receive(5000);
+               if (msg == null)
+               {
+                  break;
+               }
+               
+               if (msg.getLongProperty("count") != msgs + pendingMsgs)
+               {
+                  errors++;
+                  System.out.println("count should be " + (msgs + pendingMsgs) + " when it was " + msg.getLongProperty("count") + " on " + queueJNDI);
+               }
+               
+               pendingMsgs++;
+               
+            }
+            
+            endTX();
+         }
+         catch (Exception e)
+         {
+            connect();
+         }
+         
+         
+      }
+   }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.jms.example.ClientAbstract#connectClients()
+    */
+   @Override
+   protected void connectClients() throws Exception
+   {
+      
+      queue = (Queue)ctx.lookup(queueJNDI);
+      
+      cons = sess.createConsumer(queue);
+      
+      conn.start();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+    */
+   @Override
+   protected void onCommit()
+   {
+      msgs += pendingMsgs;
+      pendingMsgs = 0;
+      System.out.println("Commit on consumer " + queueJNDI + ", msgs=" + msgs);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+    */
+   @Override
+   protected void onRollback()
+   {
+      System.out.println("Rollback on consumer " + queueJNDI + ", msgs=" + msgs);
+      pendingMsgs = 0;
+   }
+   
+   public String toString()
+   {
+      return "Receiver::" + this.queueJNDI + ", msgs=" + msgs + ", pending=" + pendingMsgs;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+
+
+/**
+ * A Sender
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Sender extends ClientAbstract
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected MessageProducer producer;
+   protected Queue queue;
+   
+   protected long msgs = TXRestartSoak.MIN_MESSAGES_ON_QUEUE;
+   protected long pendingMsgs = 0;
+   
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   public Sender()
+   {
+   }
+
+   @Override
+   protected void connectClients() throws Exception
+   {
+      queue = (Queue)ctx.lookup("/queue/inputQueue");
+      producer = sess.createProducer(queue);
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+   }
+   
+   public void run()
+   {
+      super.run();
+      while (running)
+      {
+         try
+         {
+            beginTX();
+            for (int i = 0 ; i < 1000; i++)
+            {
+               BytesMessage msg = sess.createBytesMessage();
+               msg.setLongProperty("count", pendingMsgs + msgs);
+               msg.writeBytes(new byte[10 * 1024]);
+               producer.send(msg);
+               pendingMsgs++;
+            }
+            endTX();
+         }
+         catch (Exception e)
+         {
+            connect();
+         }
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+    */
+   @Override
+   protected void onCommit()
+   {
+      this.msgs += pendingMsgs;
+      pendingMsgs = 0;
+      System.out.println("commit on sender msgs = " + msgs );
+   }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+    */
+   @Override
+   protected void onRollback()
+   {
+      pendingMsgs = 0;
+      System.out.println("Rolled back msgs=" + msgs);
+   }
+   
+   public String toString()
+   {
+      return "Sender, msgs=" + msgs + ", pending=" + pendingMsgs;
+      
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java	                        (rev 0)
+++ branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java	2010-09-03 02:05:02 UTC (rev 9634)
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+
+/**
+ * 
+ * This is used as a soak test to verify HornetQ's capability of persistent messages over restarts.
+ * 
+ * This is used as a smoke test before releases.
+ * 
+ * WARNING: This is not a sample on how you should handle XA.
+ *          You are supposed to use a TransactionManager.
+ *          This class is doing the job of a TransactionManager that fits for the purpose of this test only,
+ *          however there are many more pitfalls to deal with Transactions. 
+ *          
+ *          This is just to stress and soak test Transactions with HornetQ. 
+ *          
+ *          And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TXRestartSoak extends HornetQExample
+{
+   
+   public static final int MIN_MESSAGES_ON_QUEUE = 50000;
+   
+   private static final Logger log = Logger.getLogger(TXRestartSoak.class.getName());
+
+   public static void main(final String[] args)
+   {
+      new TXRestartSoak().run(args);
+   }
+
+   private TXRestartSoak()
+   {
+      super();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.common.example.HornetQExample#runExample()
+    */
+   @Override
+   public boolean runExample() throws Exception
+   {
+      
+      Connection connection = null;
+      InitialContext initialContext = null;
+
+      try
+      {
+         // Step 1. Create an initial context to perform the JNDI lookup.
+         initialContext = getContext(0);
+         
+         ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+         // Step 4. Create the JMS objects
+         connection = cf.createConnection();
+
+         // Step 2. Perfom a lookup on the queue
+         Queue queue = (Queue)initialContext.lookup("/queue/inputQueue");
+         
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         
+         MessageProducer producer = session.createProducer(queue);
+         
+         for (int i = 0 ; i < MIN_MESSAGES_ON_QUEUE; i++)
+         {
+            BytesMessage msg = session.createBytesMessage();
+            msg.setLongProperty("count", i);
+            msg.writeBytes(new byte[10 * 1024]);
+            producer.send(msg);
+            
+            if (i % 1000 == 0)
+            {
+               System.out.println("Sent " + i + " messages");
+               session.commit();
+            }
+         }
+         
+         session.commit();
+         
+         Sender send = new Sender();
+         
+         send.start();
+         
+         Receiver rec1 = new Receiver("/queue/diverted1");
+         rec1.start();
+         Receiver rec2 = new Receiver("/queue/diverted2");
+         rec2.start();
+         
+         
+         if (runServer)
+         {
+            long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+            while (timeEnd > System.currentTimeMillis())
+            {
+               System.out.println("Letting the service run for 20 seconds");
+               Thread.sleep(TimeUnit.SECONDS.toMillis(20));
+               stopServers();
+
+               Thread.sleep(10000);
+               
+               boolean disconnected = false;
+               
+               if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 || rec2.getErrorsCount() != 0)
+               {
+                  System.out.println("There are sequence errors in some of the clients, please look at the logs");
+                  break;
+               }
+               
+               while (!disconnected)
+               {
+                  disconnected = send.getConnection() == null && rec1.getConnection() == null && rec2.getConnection() == null;
+                  if (!disconnected)
+                  {
+                     System.out.println("NOT ALL THE CLIENTS ARE DISCONNECTED, NEED TO WAIT THEM");
+                  }
+                  Thread.sleep(1000);
+               }
+               
+               startServers();
+            }
+         }
+         else
+         {
+            long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+            while (timeEnd > System.currentTimeMillis())
+            {
+
+            }
+         }
+         
+         send.setRunning(false);
+         rec1.setRunning(false);
+         rec2.setRunning(false);
+         
+         send.join();
+         rec1.join();
+         rec2.join();
+         
+         return send.getErrorsCount() == 0 && rec1.getErrorsCount() == 0 && rec2.getErrorsCount() == 0;
+ 
+      }
+      finally
+      {
+         connection.close();
+      }
+      
+   }
+}



More information about the hornetq-commits mailing list