[jboss-cvs] JBoss Messaging SVN: r7778 - in trunk: examples/soak/normal and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 18 12:12:41 EDT 2009


Author: jmesnil
Date: 2009-08-18 12:12:39 -0400 (Tue, 18 Aug 2009)
New Revision: 7778

Added:
   trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakBase.java
   trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakParams.java
   trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakReceiver.java
   trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakSender.java
Removed:
   trunk/examples/soak/normal/src/org/jboss/jms/soak/example/reconnect/
Modified:
   trunk/.classpath
   trunk/examples/soak/normal/README
   trunk/examples/soak/normal/build.xml
Log:
JBMESSAGING-1589: Soak tests

* removed Soak classes
* document system prop soak.props to specify the soak properties using Ant

Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2009-08-18 16:01:34 UTC (rev 7777)
+++ trunk/.classpath	2009-08-18 16:12:39 UTC (rev 7778)
@@ -87,7 +87,6 @@
 	<classpathentry kind="src" path="examples/javaee/servlet-transport/src"/>
 	<classpathentry kind="src" path="examples/javaee/xarecovery/src"/>
 	<classpathentry kind="src" path="examples/soak/normal/src"/>
-	<classpathentry kind="src" path="examples/soak/reconnect/src"/>
 	<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-logging/lib/commons-logging.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-xerces/lib/xercesImpl.jar"/>

Modified: trunk/examples/soak/normal/README
===================================================================
--- trunk/examples/soak/normal/README	2009-08-18 16:01:34 UTC (rev 7777)
+++ trunk/examples/soak/normal/README	2009-08-18 16:12:39 UTC (rev 7778)
@@ -47,14 +47,15 @@
 The duration of the tests is configured by duration-in-minutes (defaults to 2 minutes, set to 
 -1 to run the test indefinitely).
 
+To configure the soak properties to run the client, use the system property soak.props.
 To specify the JNDI server to connect to, use the system property jndi.address
 (defaults to localhost) and jndi.port (defaults to 1099)
 
-To run a receiver connected to the local server:
+To run a receiver connected to the local server with soak.properties
   $  ant runReceiver
 
-To run a sender connected to another server: 
-  $ ant runSender -Djndi.address=172.16.8.10 -Djndi.port=2099
+To run a sender connected to another server using settings in paging-soak.properties:
+  $ ant runSender -Djndi.address=172.16.8.10 -Djndi.port=2099 -Dsoak.props=paging-soak.properties
 
 Every 1000th message, the clients will display their recent activity:
 

Modified: trunk/examples/soak/normal/build.xml
===================================================================
--- trunk/examples/soak/normal/build.xml	2009-08-18 16:01:34 UTC (rev 7777)
+++ trunk/examples/soak/normal/build.xml	2009-08-18 16:12:39 UTC (rev 7778)
@@ -35,7 +35,8 @@
     <property name="server.dir" value="server0" />
     <property name="jndi.address" value="localhost" />
     <property name="jndi.port" value="1099" />
-
+    <property name="soak.props" value="soak.properties" />
+	
 	<path id="extra.classpath">
 		<path location="${server.dir}" />
 		<fileset dir="extra-libs">
@@ -50,7 +51,7 @@
 
 	<target name="runSender" depends="compile">
 	    <property name="jndi.url" value="jnp://${jndi.address}:${jndi.port}" />
-		<java classname="org.jboss.jms.soak.example.reconnect.SoakReconnectableSender" fork="true" resultproperty="example-result">
+		<java classname="org.jboss.jms.soak.example.SoakSender" fork="true" resultproperty="example-result">
 			<jvmarg value="-Xms512M" />
 			<jvmarg value="-Xmx512M" />
 			<jvmarg value="-XX:+UseParallelGC" />
@@ -62,13 +63,13 @@
              -->
 			<jvmarg value="-Djava.util.logging.config.file=${config.dir}/logging.properties" />
 			<classpath refid="the.classpath" />
-			<arg line="${jndi.url}" />
+			<arg line="${jndi.url} ${soak.props}" />
 		</java>
 	</target>
 
 	<target name="runReceiver" depends="compile">
 	    <property name="jndi.url" value="jnp://${jndi.address}:${jndi.port}" />
-		<java classname="org.jboss.jms.soak.example.reconnect.SoakReconnectableReceiver" fork="true" resultproperty="example-result">
+		<java classname="org.jboss.jms.soak.example.SoakReceiver" fork="true" resultproperty="example-result">
 			<jvmarg value="-Xms512M" />
 			<jvmarg value="-Xmx512M" />
 			<jvmarg value="-XX:+UseParallelGC" />
@@ -80,7 +81,7 @@
              -->
 			<jvmarg value="-Djava.util.logging.config.file=${config.dir}/logging.properties" />
 			<classpath refid="the.classpath" />
-			<arg line="${jndi.url}" />
+			<arg line="${jndi.url} ${soak.props}" />
 		</java>
 	</target>
 

Copied: trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakBase.java (from rev 7777, trunk/examples/soak/normal/src/org/jboss/jms/soak/example/reconnect/SoakBase.java)
===================================================================
--- trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakBase.java	                        (rev 0)
+++ trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakBase.java	2009-08-18 16:12:39 UTC (rev 7778)
@@ -0,0 +1,143 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.jms.soak.example;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.Random;
+import java.util.logging.Logger;
+
+/**
+ * 
+ * A SoakBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class SoakBase
+{
+   private static final Logger log = Logger.getLogger(SoakBase.class.getName());
+
+   private static final String DEFAULT_SOAK_PROPERTIES_FILE_NAME = "soak.properties";
+
+   public static final int TO_MILLIS = 60 * 1000; // from minute to milliseconds
+
+   public static byte[] randomByteArray(final int length)
+   {
+      byte[] bytes = new byte[length];
+
+      Random random = new Random();
+
+      for (int i = 0; i < length; i++)
+      {
+         bytes[i] = Integer.valueOf(random.nextInt()).byteValue();
+      }
+
+      return bytes;
+   }
+
+   protected static String getPerfFileName(String[] args)
+   {
+      String fileName;
+
+      if (args.length > 1)
+      {
+         fileName = args[1];
+      }
+      else
+      {
+         fileName = DEFAULT_SOAK_PROPERTIES_FILE_NAME;
+      }
+      return fileName;
+   }
+
+   protected static SoakParams getParams(final String fileName) throws Exception
+   {
+      Properties props = null;
+
+      InputStream is = null;
+
+      try
+      {
+         is = new FileInputStream(fileName);
+
+         props = new Properties();
+
+         props.load(is);
+      }
+      finally
+      {
+         if (is != null)
+         {
+            is.close();
+         }
+      }
+
+      int durationInMinutes = Integer.valueOf(props.getProperty("duration-in-minutes"));
+      int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));
+      int messageSize = Integer.valueOf(props.getProperty("message-size"));
+      boolean durable = Boolean.valueOf(props.getProperty("durable"));
+      boolean transacted = Boolean.valueOf(props.getProperty("transacted"));
+      int batchSize = Integer.valueOf(props.getProperty("batch-size"));
+      boolean drainQueue = Boolean.valueOf(props.getProperty("drain-queue"));
+      String destinationLookup = props.getProperty("destination-lookup");
+      String connectionFactoryLookup = props.getProperty("connection-factory-lookup");
+      int throttleRate = Integer.valueOf(props.getProperty("throttle-rate"));
+      boolean dupsOK = Boolean.valueOf(props.getProperty("dups-ok-acknowlege"));
+      boolean disableMessageID = Boolean.valueOf(props.getProperty("disable-message-id"));
+      boolean disableTimestamp = Boolean.valueOf(props.getProperty("disable-message-timestamp"));
+
+      log.info("duration-in-minutes: " + durationInMinutes);
+      log.info("num-warmup-messages: " + noOfWarmupMessages);
+      log.info("message-size: " + messageSize);
+      log.info("durable: " + durable);
+      log.info("transacted: " + transacted);
+      log.info("batch-size: " + batchSize);
+      log.info("drain-queue: " + drainQueue);
+      log.info("throttle-rate: " + throttleRate);
+      log.info("connection-factory-lookup: " + connectionFactoryLookup);
+      log.info("destination-lookup: " + destinationLookup);
+      log.info("disable-message-id: " + disableMessageID);
+      log.info("disable-message-timestamp: " + disableTimestamp);
+      log.info("dups-ok-acknowledge: " + dupsOK);
+
+      SoakParams soakParams = new SoakParams();
+      soakParams.setDurationInMinutes(durationInMinutes);
+      soakParams.setNoOfWarmupMessages(noOfWarmupMessages);
+      soakParams.setMessageSize(messageSize);
+      soakParams.setDurable(durable);
+      soakParams.setSessionTransacted(transacted);
+      soakParams.setBatchSize(batchSize);
+      soakParams.setDrainQueue(drainQueue);
+      soakParams.setConnectionFactoryLookup(connectionFactoryLookup);
+      soakParams.setDestinationLookup(destinationLookup);
+      soakParams.setThrottleRate(throttleRate);
+      soakParams.setDisableMessageID(disableMessageID);
+      soakParams.setDisableTimestamp(disableTimestamp);
+      soakParams.setDupsOK(dupsOK);
+
+      return soakParams;
+   }
+}

Copied: trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakParams.java (from rev 7777, trunk/examples/soak/normal/src/org/jboss/jms/soak/example/reconnect/SoakParams.java)
===================================================================
--- trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakParams.java	                        (rev 0)
+++ trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakParams.java	2009-08-18 16:12:39 UTC (rev 7778)
@@ -0,0 +1,157 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.jms.soak.example;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Class that holds the parameters used in the performance examples
+ * 
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class SoakParams implements Serializable
+{
+   private static final long serialVersionUID = -4336539641012356002L;
+   
+   private int durationInMinutes = 60;
+   private int noOfWarmupMessages;
+   private int messageSize = 1024; // in bytes
+   private boolean durable = false;
+   private boolean isSessionTransacted = false;
+   private int batchSize = 5000;
+   private boolean drainQueue = true;
+   private String connectionFactoryLookup;
+   private String destinationLookup;
+   private int throttleRate;
+   private boolean disableMessageID;
+   private boolean disableTimestamp;
+   private boolean dupsOK;
+   
+   public synchronized int getDurationInMinutes()
+   {
+      return durationInMinutes;
+   }
+   public synchronized void setDurationInMinutes(int durationInMinutes)
+   {
+      this.durationInMinutes = durationInMinutes;
+   }
+   public synchronized int getNoOfWarmupMessages()
+   {
+      return noOfWarmupMessages;
+   }
+   public synchronized void setNoOfWarmupMessages(int noOfWarmupMessages)
+   {
+      this.noOfWarmupMessages = noOfWarmupMessages;
+   }
+   public synchronized int getMessageSize()
+   {
+      return messageSize;
+   }
+   public synchronized void setMessageSize(int messageSize)
+   {
+      this.messageSize = messageSize;
+   }
+   public synchronized boolean isDurable()
+   {
+      return durable;
+   }
+   public synchronized void setDurable(boolean durable)
+   {
+      this.durable = durable;
+   }
+   public synchronized boolean isSessionTransacted()
+   {
+      return isSessionTransacted;
+   }
+   public synchronized void setSessionTransacted(boolean isSessionTransacted)
+   {
+      this.isSessionTransacted = isSessionTransacted;
+   }
+   public synchronized int getBatchSize()
+   {
+      return batchSize;
+   }
+   public synchronized void setBatchSize(int batchSize)
+   {
+      this.batchSize = batchSize;
+   }
+   public synchronized boolean isDrainQueue()
+   {
+      return drainQueue;
+   }
+   public synchronized void setDrainQueue(boolean drainQueue)
+   {
+      this.drainQueue = drainQueue;
+   }
+   public synchronized String getConnectionFactoryLookup()
+   {
+      return connectionFactoryLookup;
+   }
+   public synchronized void setConnectionFactoryLookup(String connectionFactoryLookup)
+   {
+      this.connectionFactoryLookup = connectionFactoryLookup;
+   }
+   public synchronized String getDestinationLookup()
+   {
+      return destinationLookup;
+   }
+   public synchronized void setDestinationLookup(String destinationLookup)
+   {
+      this.destinationLookup = destinationLookup;
+   }
+   public synchronized int getThrottleRate()
+   {
+      return throttleRate;
+   }
+   public synchronized void setThrottleRate(int throttleRate)
+   {
+      this.throttleRate = throttleRate;
+   }
+   public synchronized boolean isDisableMessageID()
+   {
+      return disableMessageID;
+   }
+   public synchronized void setDisableMessageID(boolean disableMessageID)
+   {
+      this.disableMessageID = disableMessageID;
+   }
+   public synchronized boolean isDisableTimestamp()
+   {
+      return disableTimestamp;
+   }
+   public synchronized void setDisableTimestamp(boolean disableTimestamp)
+   {
+      this.disableTimestamp = disableTimestamp;
+   }
+   public synchronized boolean isDupsOK()
+   {
+      return dupsOK;
+   }
+   public synchronized void setDupsOK(boolean dupsOK)
+   {
+      this.dupsOK = dupsOK;
+   }   
+     
+
+
+}

Copied: trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakReceiver.java (from rev 7777, trunk/examples/soak/normal/src/org/jboss/jms/soak/example/reconnect/SoakReconnectableReceiver.java)
===================================================================
--- trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakReceiver.java	                        (rev 0)
+++ trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakReceiver.java	2009-08-18 16:12:39 UTC (rev 7778)
@@ -0,0 +1,238 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.jms.soak.example;
+
+import java.util.Hashtable;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+public class SoakReceiver
+{
+   private static final Logger log = Logger.getLogger(SoakReceiver.class.getName());
+
+   private static final String EOF = UUID.randomUUID().toString();
+
+   public static void main(String[] args)
+   {
+      for (int i = 0; i < args.length; i++)
+      {
+         System.out.println(i + ":" + args[i]);
+      }
+      String jndiURL = "jndi://localhost:1099";
+      if (args.length > 0)
+      {
+         jndiURL = args[0];
+      }
+      
+      System.out.println("Connecting to JNDI at " + jndiURL);
+      
+      try
+      {
+         String fileName = SoakBase.getPerfFileName(args);
+
+         SoakParams params = SoakBase.getParams(fileName);
+
+         Hashtable<String, String> jndiProps = new Hashtable<String, String>();
+         jndiProps.put("java.naming.provider.url", jndiURL);
+         jndiProps.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
+         jndiProps.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
+
+         final SoakReceiver receiver = new SoakReceiver(jndiProps, params);
+
+         Runtime.getRuntime().addShutdownHook(new Thread()
+         {
+            @Override
+            public void run()
+            {
+               receiver.disconnect();
+            }
+         });
+
+         receiver.run();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private final Hashtable<String, String> jndiProps;
+
+   private final SoakParams perfParams;
+
+   private final ExceptionListener exceptionListener = new ExceptionListener()
+   {
+      public void onException(JMSException e)
+      {
+         disconnect();
+         connect();
+      }
+   };
+
+   private final MessageListener listener = new MessageListener()
+   {
+      int modulo = 10000;
+
+      private final AtomicLong count = new AtomicLong(0);
+
+      private long start = System.currentTimeMillis();
+      long moduloStart = start;
+
+      public void onMessage(Message msg)
+      {
+         long totalDuration = System.currentTimeMillis() - start;
+
+         try
+         {
+            if (EOF.equals(msg.getStringProperty("eof")))
+            {
+               log.info(String.format("Received %s messages in %.2f minutes", count, (1.0 * totalDuration) / SoakBase.TO_MILLIS));
+               log.info("END OF RUN");
+
+               return;
+            }
+         }
+         catch (JMSException e1)
+         {
+            e1.printStackTrace();
+         }
+         if (count.incrementAndGet() % modulo == 0)
+         {
+            double duration = (1.0 * System.currentTimeMillis() - moduloStart) / 1000;
+            moduloStart = System.currentTimeMillis();
+            log.info(String.format("received %s messages in %2.2fs (total: %.0fs)", modulo, duration, totalDuration / 1000.0));
+         }
+      }
+   };
+
+   private Session session;
+
+   private Connection connection;
+
+   private SoakReceiver(final Hashtable<String, String> jndiProps, final SoakParams perfParams)
+   {
+      this.jndiProps = jndiProps;
+      this.perfParams = perfParams;
+   }
+
+   public void run() throws Exception
+   {
+      connect();
+
+      boolean runInfinitely = (perfParams.getDurationInMinutes() == -1);
+
+      if (!runInfinitely)
+      {
+         Thread.sleep(perfParams.getDurationInMinutes() * SoakBase.TO_MILLIS);
+
+         // send EOF message
+         Message eof = session.createMessage();
+         eof.setStringProperty("eof", EOF);
+         listener.onMessage(eof);
+         
+         if (connection != null)
+         {
+            connection.close();
+            connection = null;
+         }
+      } else
+      {
+         while (true)
+         {
+            Thread.sleep(500);
+         }
+      }
+   }
+
+   private void disconnect()
+   {
+      if (connection != null)
+      {
+         try
+         {
+            connection.setExceptionListener(null);
+            connection.close();
+         }
+         catch (JMSException e)
+         {
+            e.printStackTrace();
+         }
+         finally
+         {
+            connection = null;
+         }
+      }
+   }
+
+   private void connect()
+   {
+      InitialContext ic = null;
+      try
+      {
+         ic = new InitialContext(jndiProps);
+
+         ConnectionFactory factory = (ConnectionFactory)ic.lookup(perfParams.getConnectionFactoryLookup());
+
+         Destination destination = (Destination)ic.lookup(perfParams.getDestinationLookup());
+
+         connection = factory.createConnection();
+         connection.setExceptionListener(exceptionListener);
+
+         session = connection.createSession(perfParams.isSessionTransacted(),
+                                                    perfParams.isDupsOK() ? Session.DUPS_OK_ACKNOWLEDGE
+                                                                         : Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer messageConsumer = session.createConsumer(destination);
+         messageConsumer.setMessageListener(listener);
+
+         connection.start();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         try
+         {
+            ic.close();
+         }
+         catch (NamingException e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+}
\ No newline at end of file

Copied: trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakSender.java (from rev 7777, trunk/examples/soak/normal/src/org/jboss/jms/soak/example/reconnect/SoakReconnectableSender.java)
===================================================================
--- trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakSender.java	                        (rev 0)
+++ trunk/examples/soak/normal/src/org/jboss/jms/soak/example/SoakSender.java	2009-08-18 16:12:39 UTC (rev 7778)
@@ -0,0 +1,257 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.jms.soak.example;
+
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.jboss.messaging.utils.TokenBucketLimiter;
+import org.jboss.messaging.utils.TokenBucketLimiterImpl;
+
+public class SoakSender
+{
+   private static final Logger log = Logger.getLogger(SoakSender.class.getName());
+
+
+   public static void main(String[] args)
+   {
+      for (int i = 0; i < args.length; i++)
+      {
+         System.out.println(i + ":" + args[i]);
+      }
+      String jndiURL = "jndi://localhost:1099";
+      if (args.length > 0)
+      {
+         jndiURL = args[0];
+      }
+      
+      System.out.println("Connecting to JNDI at " + jndiURL);
+      try
+      {
+         String fileName = SoakBase.getPerfFileName(args);
+
+         SoakParams params = SoakBase.getParams(fileName);
+
+         Hashtable<String, String> jndiProps = new Hashtable<String, String>();
+         jndiProps.put("java.naming.provider.url", jndiURL);
+         jndiProps.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
+         jndiProps.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
+
+         final SoakSender sender = new SoakSender(jndiProps, params);
+
+         Runtime.getRuntime().addShutdownHook(new Thread()
+         {
+            @Override
+            public void run()
+            {
+               sender.disconnect();
+            }
+         });
+
+         sender.run();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private final SoakParams perfParams;
+
+   private final Hashtable<String, String> jndiProps;
+
+   private Connection connection;
+
+   private Session session;
+
+   private MessageProducer producer;
+
+   private final ExceptionListener exceptionListener = new ExceptionListener()
+   {
+      public void onException(JMSException e)
+      {
+         System.out.println("SoakReconnectableSender.exceptionListener.new ExceptionListener() {...}.onException()");
+         disconnect();
+         connect();
+      }
+
+   };
+
+   private SoakSender(final Hashtable<String, String> jndiProps, final SoakParams perfParams)
+   {
+      this.jndiProps = jndiProps;
+      this.perfParams = perfParams;
+   }
+
+   public void run() throws Exception
+   {
+      connect();
+
+      boolean runInfinitely = (perfParams.getDurationInMinutes() == -1);
+      
+      BytesMessage message = session.createBytesMessage();
+
+      byte[] payload = SoakBase.randomByteArray(perfParams.getMessageSize());
+
+      message.writeBytes(payload);
+
+      final int modulo = 10000;
+
+      TokenBucketLimiter tbl = perfParams.getThrottleRate() != -1 ? new TokenBucketLimiterImpl(perfParams.getThrottleRate(),
+                                                                                               false)
+                                                                 : null;
+
+      boolean transacted = perfParams.isSessionTransacted();
+      int txBatchSize = perfParams.getBatchSize();
+      boolean display = true;
+
+      long start = System.currentTimeMillis();
+      long moduleStart = start;
+      AtomicLong count = new AtomicLong(0);
+      while (true)
+      {
+         try
+         {
+            producer.send(message);
+            count.incrementAndGet();
+
+            if (transacted)
+            {
+               if (count.longValue() % txBatchSize == 0)
+               {
+                  session.commit();
+               }
+            }
+
+            long totalDuration = System.currentTimeMillis() - start;
+
+            if (display && (count.longValue() % modulo == 0))
+            {
+               double duration = (1.0 * System.currentTimeMillis() - moduleStart) / 1000;
+               moduleStart = System.currentTimeMillis();
+               log.info(String.format("sent %s messages in %2.2fs (time: %.0fs)", modulo, duration, totalDuration / 1000.0));
+            }
+
+            if (tbl != null)
+            {
+               tbl.limit();
+            }
+            
+            if (!runInfinitely && totalDuration > perfParams.getDurationInMinutes() * SoakBase.TO_MILLIS)
+            {
+               break;
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+      
+      log.info(String.format("Sent %s messages in %s minutes", count, perfParams.getDurationInMinutes()));
+      log.info("END OF RUN");
+
+      
+      if (connection != null)
+      {
+         connection.close();
+         connection = null;
+      }
+   }
+
+   private synchronized void disconnect()
+   {
+      if (connection != null)
+      {
+         try
+         {
+            connection.setExceptionListener(null);
+            connection.close();
+         }
+         catch (JMSException e)
+         {
+            e.printStackTrace();
+         }
+         finally
+         {
+            connection = null;
+         }
+      }
+   }
+
+   private void connect()
+   {
+      InitialContext ic = null;
+      try
+      {
+         ic = new InitialContext(jndiProps);
+
+         ConnectionFactory factory = (ConnectionFactory)ic.lookup(perfParams.getConnectionFactoryLookup());
+
+         Destination destination = (Destination)ic.lookup(perfParams.getDestinationLookup());
+
+         connection = factory.createConnection();
+
+         session = connection.createSession(perfParams.isSessionTransacted(),
+                                            perfParams.isDupsOK() ? Session.DUPS_OK_ACKNOWLEDGE
+                                                                 : Session.AUTO_ACKNOWLEDGE);
+
+         producer = session.createProducer(destination);
+
+         producer.setDeliveryMode(perfParams.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         producer.setDisableMessageID(perfParams.isDisableMessageID());
+
+         producer.setDisableMessageTimestamp(perfParams.isDisableTimestamp());
+
+         connection.setExceptionListener(exceptionListener);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         try
+         {
+            ic.close();
+         }
+         catch (NamingException e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+}
\ No newline at end of file




More information about the jboss-cvs-commits mailing list