[jboss-cvs] JBoss Messaging SVN: r6966 - in trunk: docs/user-manual/en and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 21 09:50:52 EDT 2009


Author: timfox
Date: 2009-05-21 09:50:51 -0400 (Thu, 21 May 2009)
New Revision: 6966

Added:
   trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
Removed:
   trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java
Modified:
   trunk/.classpath
   trunk/docs/user-manual/en/using-core.xml
   trunk/examples/core/perf/
   trunk/examples/core/perf/build.xml
   trunk/examples/core/perf/perf.properties
   trunk/examples/core/perf/server0/jbm-configuration.xml
   trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java
   trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java
   trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java
   trunk/examples/jms/paging/build.xml
   trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/utils/XMLUtil.java
Log:
perf example

Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/.classpath	2009-05-21 13:50:51 UTC (rev 6966)
@@ -15,6 +15,7 @@
 	<classpathentry kind="src" path="tests/joram-tests/src"/>
 	<classpathentry kind="src" path="tests/joram-tests/config"/>
 	<classpathentry kind="src" path="examples/core/embedded/src"/>
+	<classpathentry kind="src" path="examples/core/perf/src"/>
 	<classpathentry kind="src" path="examples/jms/application-layer-failover/src"/>
 	<classpathentry kind="src" path="examples/jms/automatic-failover/src"/>
 	<classpathentry kind="src" path="examples/jms/bridge/src"/>

Modified: trunk/docs/user-manual/en/using-core.xml
===================================================================
--- trunk/docs/user-manual/en/using-core.xml	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/docs/user-manual/en/using-core.xml	2009-05-21 13:50:51 UTC (rev 6966)
@@ -107,7 +107,7 @@
                       new TransportConfiguration(
                               InVMConnectorFactory.class.getName()), null);
 
-ClientSession session = factory.createSession();
+ClientSession session = nettyFactory.createSession();
 session.createQueue("example", "example", true);
 
 ClientProducer producer = session.createProducer("example");


Property changes on: trunk/examples/core/perf
___________________________________________________________________
Name: svn:ignore
   + build
data
logs


Modified: trunk/examples/core/perf/build.xml
===================================================================
--- trunk/examples/core/perf/build.xml	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/build.xml	2009-05-21 13:50:51 UTC (rev 6966)
@@ -39,7 +39,7 @@
        <path location="server0"/>
    </path>
 
-   <target name="runSender">
+   <target name="runSender" depends="compile">
       <java classname="org.jboss.core.example.PerfSender" fork="true" resultproperty="example-result">
 		   <jvmarg value="-Xms512M"/>
 		   <jvmarg value="-Xmx512M"/>
@@ -50,8 +50,8 @@
       </java>
    </target>
 
-   <target name="runListener">
-      <java classname="org.jboss.core.example.PerfSender" fork="true" resultproperty="example-result">
+   <target name="runListener" depends="compile">
+      <java classname="org.jboss.core.example.PerfListener" fork="true" resultproperty="example-result">
          <jvmarg value="-Xms512M"/>
 		   <jvmarg value="-Xmx512M"/>
          <jvmarg value="-XX:+UseParallelGC"/>

Modified: trunk/examples/core/perf/perf.properties
===================================================================
--- trunk/examples/core/perf/perf.properties	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/perf.properties	2009-05-21 13:50:51 UTC (rev 6966)
@@ -1,9 +1,10 @@
-num-messages=20000
-num-warmup-messages=2000
-message-size=1400
+num-messages=1000000
+num-warmup-messages=50000
+message-size=0
 durable=false
 transacted=false
 batch-size=1000
 drain-queue=true
 queue-name=perfQueue
-throttle-rate=20000
\ No newline at end of file
+throttle-rate=-1
+address=perfAddress
\ No newline at end of file

Modified: trunk/examples/core/perf/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/jbm-configuration.xml	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/server0/jbm-configuration.xml	2009-05-21 13:50:51 UTC (rev 6966)
@@ -6,10 +6,21 @@
    <!-- Acceptors -->
    <acceptors>
       <acceptor name="netty-acceptor">
-         <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>         
+         <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>    
+         <param key="jbm.remoting.netty.tcpnodelay" value="false" type="Boolean"/>
+         <param key="jbm.remoting.netty.tcpsendbuffersize" value="1048576" type="Boolean"/>
+         <param key="jbm.remoting.netty.tcpreceivebuffersize" value="1048576" type="Boolean"/>
       </acceptor>
    </acceptors>
    
    <security-enabled>false</security-enabled>
+   
+   <persistence-enabled>false</persistence-enabled>
+   
+   <queues>
+	   <queue name="perfQueue">
+	   	<address>perfAddress</address>
+	   </queue>
+   </queues>
 
 </configuration>

Added: trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java	                        (rev 0)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -0,0 +1,419 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.core.example;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
+import org.jboss.messaging.integration.transports.netty.TransportConstants;
+import org.jboss.messaging.utils.TokenBucketLimiter;
+import org.jboss.messaging.utils.TokenBucketLimiterImpl;
+
+/**
+ * 
+ * A PerfBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public abstract class PerfBase
+{
+   private static final Logger log = Logger.getLogger(PerfSender.class.getName());
+
+   protected static PerfParams getParams() throws Exception
+   {
+      Properties props = null;
+
+      InputStream is = null;
+
+      try
+      {
+         is = new FileInputStream("perf.properties");
+
+         props = new Properties();
+
+         props.load(is);
+      }
+      finally
+      {
+         if (is != null)
+         {
+            is.close();
+         }
+      }
+
+      int noOfMessages = Integer.valueOf(props.getProperty("num-messages"));
+      int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));
+      int messageSize = Integer.valueOf(props.getProperty("message-size"));
+      boolean durable = Boolean.valueOf(props.getProperty("durable"));
+      boolean transacted = Boolean.valueOf(props.getProperty("transacted"));
+      int batchSize = Integer.valueOf(props.getProperty("batch-size"));
+      boolean drainQueue = Boolean.valueOf(props.getProperty("drain-queue"));
+      String queueName = props.getProperty("queue-name");
+      String address = props.getProperty("address");
+      int throttleRate = Integer.valueOf(props.getProperty("throttle-rate"));
+
+      log.info("num-messages: " + noOfMessages);
+      log.info("num-warmup-messages: " + noOfWarmupMessages);
+      log.info("message-size: " + messageSize);
+      log.info("durable: " + durable);
+      log.info("transacted: " + transacted);
+      log.info("batch-size: " + batchSize);
+      log.info("drain-queue: " + drainQueue);
+      log.info("address: " + address);
+      log.info("throttle-rate: " + throttleRate);
+
+      PerfParams perfParams = new PerfParams();
+      perfParams.setNoOfMessagesToSend(noOfMessages);
+      perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
+      perfParams.setMessageSize(messageSize);
+      perfParams.setDurable(durable);
+      perfParams.setSessionTransacted(transacted);
+      perfParams.setBatchSize(batchSize);
+      perfParams.setDrainQueue(drainQueue);
+      perfParams.setQueueName(queueName);
+      perfParams.setAddress(address);
+      perfParams.setThrottleRate(throttleRate);
+
+      return perfParams;
+   }
+
+   private final PerfParams perfParams;
+
+   protected PerfBase(final PerfParams perfParams)
+   {
+      this.perfParams = perfParams;
+   }
+
+   private ClientSessionFactory factory;
+
+   private ClientSession session;
+
+   private long start;
+
+   private void init(final boolean transacted, final String queueName) throws Exception
+   {
+      Map<String, Object> params = new HashMap<String, Object>();
+
+      params.put(TransportConstants.TCP_NODELAY_PROPNAME, false);
+      params.put(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, 1024 * 1024);
+      params.put(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, 1024 * 1024);
+
+      factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName(), params));
+
+      factory.setAckBatchSize(perfParams.getBatchSize());
+
+      session = factory.createSession(!transacted, !transacted);
+   }
+
+   private void displayAverage(final long numberOfMessages, final long start, final long end)
+   {
+      double duration = (1.0 * end - start) / 1000; // in seconds
+      double average = (1.0 * numberOfMessages / duration);
+      log.info(String.format("average: %.2f msg/s (%d messages in %2.2fs)", average, numberOfMessages, duration));
+   }
+
+   protected void runSender()
+   {
+      try
+      {
+         log.info("params = " + perfParams);
+         init(perfParams.isSessionTransacted(), perfParams.getQueueName());
+
+         if (perfParams.isDrainQueue())
+         {
+            drainQueue();
+         }
+
+         start = System.currentTimeMillis();
+         log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
+         sendMessages(perfParams.getNoOfWarmupMessages(),
+                      perfParams.getBatchSize(),
+                      perfParams.isDurable(),
+                      perfParams.isSessionTransacted(),
+                      false,
+                      perfParams.getThrottleRate(),
+                      perfParams.getMessageSize());
+         log.info("warmed up");
+         start = System.currentTimeMillis();
+         sendMessages(perfParams.getNoOfMessagesToSend(),
+                      perfParams.getBatchSize(),
+                      perfParams.isDurable(),
+                      perfParams.isSessionTransacted(),
+                      true,
+                      perfParams.getThrottleRate(),
+                      perfParams.getMessageSize());
+         long end = System.currentTimeMillis();
+         displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+   
+   protected void runListener()
+   {
+      try
+      {
+         
+         init(perfParams.isSessionTransacted(), perfParams.getQueueName());
+
+         if (perfParams.isDrainQueue())
+         {
+            drainQueue();
+         }
+                  
+         ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
+         
+         session.start();
+
+         log.info("READY!!!");
+
+         CountDownLatch countDownLatch = new CountDownLatch(1);
+         consumer.setMessageHandler(new PerfListener(countDownLatch, perfParams));
+         countDownLatch.await();
+         long end = System.currentTimeMillis();
+         // start was set on the first received message
+         displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (session != null)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   private void drainQueue() throws Exception
+   {
+      log.info("Draining queue");
+      ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
+
+      session.start();
+
+      ClientMessage message = null;
+
+      int count = 0;
+      do
+      {
+         message = consumer.receive(3000);
+
+         if (message != null)
+         {
+            message.acknowledge();
+
+            count++;
+         }
+      }
+      while (message != null);
+      
+      consumer.close();
+      
+      log.info("Drained " + count + " messages");
+   }
+
+   private void sendMessages(final int numberOfMessages,
+                             final int txBatchSize,
+                             final boolean durable,
+                             final boolean transacted,
+                             final boolean display,
+                             final int throttleRate,
+                             final int messageSize) throws Exception
+   {
+      ClientProducer producer = session.createProducer(perfParams.getAddress());
+
+      ClientMessage message = session.createClientMessage(durable);
+
+      byte[] payload = new byte[messageSize];
+
+      message.getBody().writeBytes(payload);
+
+      final int modulo = 2000;
+
+      TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false) : null;
+
+      boolean committed = false;
+      for (int i = 1; i <= numberOfMessages; i++)
+      {
+         producer.send(message);
+
+         if (transacted)
+         {
+            if (i % txBatchSize == 0)
+            {
+               session.commit();
+               committed = true;
+            }
+            else
+            {
+               committed = false;
+            }
+         }
+         if (display && (i % modulo == 0))
+         {
+            double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+            log.info(String.format("sent %6d messages in %2.2fs", i, duration));
+         }
+
+         if (tbl != null)
+         {
+            tbl.limit();
+         }
+      }
+      if (transacted && !committed)
+      {
+         session.commit();
+      }
+   }
+   
+   private class PerfListener implements MessageHandler
+   {
+      private final CountDownLatch countDownLatch;
+
+      private final PerfParams perfParams;
+
+      private boolean warmingUp = true;
+      
+      private boolean started = false;
+
+      private final int modulo;
+      
+      private final AtomicLong count = new AtomicLong(0);
+
+      public PerfListener(final CountDownLatch countDownLatch, final PerfParams perfParams)
+      {
+         this.countDownLatch = countDownLatch;
+         this.perfParams = perfParams;
+         warmingUp = perfParams.getNoOfWarmupMessages() > 0;
+         this.modulo = 2000;
+      }
+
+      public void onMessage(final ClientMessage message)
+      {
+         try
+         {
+            if (warmingUp)
+            {
+               boolean committed = checkCommit();
+               if (count.incrementAndGet() == perfParams.getNoOfWarmupMessages())
+               {
+                  log.info("warmed up after receiving " + count.longValue() + " msgs");
+                  if (!committed)
+                  {
+                     checkCommit();
+                  }
+                  warmingUp = false;
+               }
+               return;
+            }
+
+            if (!started)
+            {
+               started = true;
+               // reset count to take stats
+               count.set(0);
+               start = System.currentTimeMillis();
+            }
+            
+            message.acknowledge();
+
+            long currentCount = count.incrementAndGet();
+            boolean committed = checkCommit();
+            if (currentCount == perfParams.getNoOfMessagesToSend())
+            {
+               if (!committed)
+               {
+                  checkCommit();
+               }
+               countDownLatch.countDown();
+            }
+            if (currentCount % modulo == 0)
+            {
+               double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+               log.info(String.format("received %6d messages in %2.2fs", currentCount, duration));
+            }
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+
+      private boolean checkCommit() throws Exception
+      {
+         if (perfParams.isSessionTransacted())
+         {
+            if (count.longValue() % perfParams.getBatchSize() == 0)
+            {
+               session.commit();
+
+               return true;
+            }
+         }
+         return false;
+      }
+   }
+
+}

Deleted: trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -1,103 +0,0 @@
-/*
-   * JBoss, Home of Professional Open Source
-   * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
-   * by the @authors tag. See the copyright.txt in the distribution for a
-   * full listing of individual contributors.
-   *
-   * This is free software; you can redistribute it and/or modify it
-   * under the terms of the GNU Lesser General Public License as
-   * published by the Free Software Foundation; either version 2.1 of
-   * the License, or (at your option) any later version.
-   *
-   * This software is distributed in the hope that it will be useful,
-   * but WITHOUT ANY WARRANTY; without even the implied warranty of
-   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   * Lesser General Public License for more details.
-   *
-   * You should have received a copy of the GNU Lesser General Public
-   * License along with this software; if not, write to the Free
-   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-   */
-package org.jboss.core.example;
-
-import org.jboss.common.example.JBMExample;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
-
-/**
- * 
- * A PerfExample
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class PerfExample extends JBMExample
-{
-   public static void main(String[] args)
-   {
-      new PerfExample().run(args);
-   }
-
-   public boolean runExample() throws Exception
-   {
-      final String perfAddress = "perfAddress";
-      
-      final String perfQueueName = "perfQueue";
-      
-      ClientSessionFactory factory = null;
-
-      ClientSession session = null;
-
-      try
-      {
-         factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName()));
-
-         session = factory.createSession();
-         
-         session.createQueue(perfAddress, perfQueueName, true);
-
-         ClientProducer producer = session.createProducer(perfAddress);
-         
-         ClientMessage message = session.createClientMessage(true);
-
-         message.getBody().writeString("Hello");
-
-         producer.send(message);
-
-         session.start();
-
-         ClientConsumer consumer = session.createConsumer(perfQueueName);
-
-         ClientMessage msgReceived = consumer.receive();
-         
-         System.out.println("message = " + msgReceived.getBody().readString());
-         
-         consumer.close();
-         
-         session.deleteQueue(perfQueueName);
-                  
-         return true;
-      }
-      finally
-      {
-         if (session != null)
-         {           
-            session.close();
-         }
-
-         if (factory != null)
-         {
-            factory.close();
-         }
-      }
-   }
-
-}

Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -1,54 +1,62 @@
 /*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.core.example;
 
+import java.util.logging.Logger;
 
-package org.jboss.core.example;
-
 /**
+ * 
  * A PerfListener
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  *
  */
-public class PerfListener
+public class PerfListener extends PerfBase
 {
+   private static final Logger log = Logger.getLogger(PerfListener.class.getName());
 
-   // Constants -----------------------------------------------------
+   public static void main(String[] args)
+   {
+      try
+      {
+         PerfParams params = getParams();
 
-   // Attributes ----------------------------------------------------
+         new PerfListener(params).run();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
 
-   // Static --------------------------------------------------------
+   private PerfListener(final PerfParams perfParams)
+   {
+      super(perfParams);
+   }
 
-   // Constructors --------------------------------------------------
+   public void run() throws Exception
+   {
+      runListener();
+   }
 
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }

Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -41,6 +41,7 @@
    private int batchSize = 5000;
    private boolean drainQueue = true;
    private String queueName = "perfQueue";  
+   private String address = "perfAddress";
    private int throttleRate;
 
    public int getNoOfMessagesToSend()
@@ -122,7 +123,16 @@
    {
       this.queueName = queueName;
    }
+   
+   public String getAddress()
+   {
+      return address;
+   }
 
+   public void setAddress(final String address)
+   {
+      this.address = address;
+   }
    
    public int getThrottleRate()
    {

Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -21,25 +21,8 @@
    */
 package org.jboss.core.example;
 
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
+import java.util.logging.Logger;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-
-import org.jboss.common.example.JBMExample;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
-import org.jboss.messaging.utils.TokenBucketLimiter;
-import org.jboss.messaging.utils.TokenBucketLimiterImpl;
-
 /**
  * 
  * A PerfSender
@@ -48,268 +31,32 @@
  *
  *
  */
-public class PerfSender extends JBMExample
+public class PerfSender extends PerfBase
 {
+   private static final Logger log = Logger.getLogger(PerfSender.class.getName());
+
    public static void main(String[] args)
-   {      
+   {
       try
       {
-         Properties props = null;
-         
-         InputStream is = null;
-                  
-         try
-         {
-            is = new FileInputStream("perf.properties");
+         PerfParams params = getParams();
 
-            props = new Properties();
-            
-            props.load(is);
-         }
-         finally
-         {
-            if (is != null)
-            {
-               is.close();
-            }
-         }
-
-         int noOfMessages = Integer.valueOf(props.getProperty("num-messages"));
-         int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));        
-         int messageSize = Integer.valueOf(props.getProperty("message-size"));
-         boolean durable = Boolean.valueOf(props.getProperty("durable"));
-         boolean transacted = Boolean.valueOf(props.getProperty("transacted"));
-         int batchSize = Integer.valueOf(props.getProperty("batch-size"));
-         boolean drainQueue =  Boolean.valueOf(props.getProperty("drain-queue"));
-         String queueName = props.getProperty("queue-name");
-         int throttleRate = Integer.valueOf(props.getProperty("throttle-rate"));
-
-         PerfParams perfParams = new PerfParams();
-         perfParams.setNoOfMessagesToSend(noOfMessages);
-         perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
-         perfParams.setMessageSize(messageSize);
-         perfParams.setDurable(durable);
-         perfParams.setSessionTransacted(transacted);
-         perfParams.setBatchSize(batchSize);
-         perfParams.setDrainQueue(drainQueue);
-         perfParams.setQueueName(queueName);
-         perfParams.setThrottleRate(throttleRate);
-
-         new PerfSender(perfParams).run(args);
-      }      
+         new PerfSender(params).run();
+      }
       catch (Exception e)
       {
          e.printStackTrace();
       }
    }
 
-   private final PerfParams perfParams;
-
    private PerfSender(final PerfParams perfParams)
    {
-      super();
-
-      this.perfParams = perfParams;
+      super(perfParams);
    }
 
-   private ClientSessionFactory factory;
-
-   private ClientSession session;
-
-   private long start;
-
-   public boolean runExample() throws Exception
+   public void run() throws Exception
    {
-      runSender(perfParams);
-
-      return true;
+      runSender();
    }
 
-   private void init(final boolean transacted, final String queueName) throws Exception
-   {
-      factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName()));
-
-      factory.setAckBatchSize(perfParams.getBatchSize());
-
-      session = factory.createSession(!transacted, !transacted);
-
-      session.createQueue(perfParams.getQueueName(), perfParams.getQueueName(), perfParams.isDurable());
-   }
-
-   private void displayAverage(final long numberOfMessages, final long start, final long end)
-   {
-      double duration = (1.0 * end - start) / 1000; // in seconds
-      double average = (1.0 * numberOfMessages / duration);
-      log.info(String.format("average: %.2f msg/s (%d messages in %2.2fs)", average, numberOfMessages, duration));
-   }
-
-   private void runSender(final PerfParams perfParams)
-   {
-      try
-      {
-         log.info("params = " + perfParams);
-         init(perfParams.isSessionTransacted(), perfParams.getQueueName());
-         start = System.currentTimeMillis();
-         log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
-         sendMessages(perfParams.getNoOfWarmupMessages(),
-                      perfParams.getBatchSize(),
-                      perfParams.isDurable(),
-                      perfParams.isSessionTransacted(),
-                      false,
-                      perfParams.getThrottleRate(),
-                      perfParams.getMessageSize());
-         log.info("warmed up");
-         start = System.currentTimeMillis();
-         sendMessages(perfParams.getNoOfMessagesToSend(),
-                      perfParams.getBatchSize(),
-                      perfParams.isDurable(),
-                      perfParams.isSessionTransacted(),
-                      true,
-                      perfParams.getThrottleRate(),
-                      perfParams.getMessageSize());
-         long end = System.currentTimeMillis();
-         displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-      finally
-      {
-         if (session != null)
-         {
-            try
-            {
-               session.close();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      }
-   }
-
-   private void sendMessages(final int numberOfMessages,
-                             final int txBatchSize,
-                             final boolean durable,
-                             final boolean transacted,
-                             final boolean display,
-                             final int throttleRate,
-                             final int messageSize) throws Exception
-   {
-      ClientProducer producer = session.createProducer(perfParams.getQueueName());
-
-      ClientMessage message = session.createClientMessage(durable);
-
-      byte[] payload = new byte[messageSize];
-
-      message.getBody().writeBytes(payload);
-
-      final int modulo = 2000;
-
-      TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false) : null;
-
-      boolean committed = false;
-      for (int i = 1; i <= numberOfMessages; i++)
-      {
-         producer.send(message);
-
-         if (transacted)
-         {
-            if (i % txBatchSize == 0)
-            {
-               session.commit();
-               committed = true;
-            }
-            else
-            {
-               committed = false;
-            }
-         }
-         if (display && (i % modulo == 0))
-         {
-            double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
-            log.info(String.format("sent %6d messages in %2.2fs", i, duration));
-         }
-
-         if (tbl != null)
-         {
-            tbl.limit();
-         }
-      }
-      if (transacted && !committed)
-      {
-         session.commit();
-      }
-   }
-
-   private void drainQueue(final MessageConsumer consumer) throws JMSException
-   {
-      log.info("draining queue");
-      while (true)
-      {
-         Message m = consumer.receive(5000);
-         if (m == null)
-         {
-            log.info("queue is drained");
-            break;
-         }
-      }
-   }
-
-   // public boolean runExample() throws Exception
-   // {
-   // final String perfAddress = "perfAddress";
-   //
-   // final String perfQueueName = "perfQueue";
-   //
-   // ClientSessionFactory factory = null;
-   //
-   // ClientSession session = null;
-   //
-   // try
-   // {
-   // factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName()));
-   //
-   // session = factory.createSession();
-   //
-   // session.createQueue(perfAddress, perfQueueName, true);
-   //
-   // ClientProducer producer = session.createProducer(perfAddress);
-   //
-   // ClientMessage message = session.createClientMessage(true);
-   //
-   // message.getBody().writeString("Hello");
-   //
-   // producer.send(message);
-   //
-   // session.start();
-   //
-   // ClientConsumer consumer = session.createConsumer(perfQueueName);
-   //
-   // ClientMessage msgReceived = consumer.receive();
-   //
-   // System.out.println("message = " + msgReceived.getBody().readString());
-   //
-   // consumer.close();
-   //
-   // session.deleteQueue(perfQueueName);
-   //
-   // return true;
-   // }
-   // finally
-   // {
-   // if (session != null)
-   // {
-   // session.close();
-   // }
-   //
-   // if (factory != null)
-   // {
-   // factory.close();
-   // }
-   // }
-   // }
-
 }

Modified: trunk/examples/jms/paging/build.xml
===================================================================
--- trunk/examples/jms/paging/build.xml	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/jms/paging/build.xml	2009-05-21 13:50:51 UTC (rev 6966)
@@ -36,7 +36,7 @@
       <antcall target="runExample">
          <param name="example.classname" value="org.jboss.jms.example.PagingExample"/>
 
-        <!-- We limit the client to running in only 50MB of RAM -->
+         <!-- We limit the client to running in only 50MB of RAM -->
     	   <param name="java-min-memory" value="50M"/>
     	   <param name="java-max-memory" value="50M"/>
       </antcall>

Modified: trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java
===================================================================
--- trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -48,76 +48,77 @@
                                           "-XX:+AggressiveOpts",
                                           "-XX:+UseFastAccessorMethods" };
 
-    new PagingExample().run(serverArgs, args);
+      new PagingExample().run(serverArgs, args);
    }
 
    public boolean runExample() throws Exception
    {
       Connection connection = null;
-      
+
       InitialContext initialContext = null;
       try
       {
-         //Step 1. Create an initial context to perform the JNDI lookup.
+         // Step 1. Create an initial context to perform the JNDI lookup.
          initialContext = getContext(0);
 
-         //Step 2. Perform a lookup on the Connection Factory
-         ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+         // Step 2. Perform a lookup on the Connection Factory
+         ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
 
-         // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number of bytes in memory
-         Queue pageQueue = (Queue) initialContext.lookup("/queue/pagingQueue");
-         
-         // Step 4. Lookup for a JMS Queue 
-         Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
+         // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number
+         // of bytes in memory
+         Queue pageQueue = (Queue)initialContext.lookup("/queue/pagingQueue");
 
+         // Step 4. Lookup for a JMS Queue
+         Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+
          // Step 5. Create a JMS Connection
          connection = cf.createConnection();
-         
-         //Step 6. Create a JMS Session
+
+         // Step 6. Create a JMS Session
          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
-         //Step 7. Create a JMS Message Producer for pageQueueAddress
+         // Step 7. Create a JMS Message Producer for pageQueueAddress
          MessageProducer pageMessageProducer = session.createProducer(pageQueue);
-         
-         //Step 8. We don't need persistent messages in order to use paging. (This step is optional)
+
+         // Step 8. We don't need persistent messages in order to use paging. (This step is optional)
          pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         
-         //Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
+
+         // Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
          BytesMessage message = session.createBytesMessage();
          message.writeBytes(new byte[10 * 1024]);
-         
 
-         //Step 10. Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at ./paging/config/jbm-queues.xml for the config.
+         // Step 10. Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at
+         // ./paging/config/jbm-queues.xml for the config.
          for (int i = 0; i < 20; i++)
          {
             pageMessageProducer.send(message);
-         }         
-         
-         //Step 11. Create a JMS Message Producer
+         }
+
+         // Step 11. Create a JMS Message Producer
          MessageProducer messageProducer = session.createProducer(queue);
-         
-         //Step 12. We don't need persistent messages in order to use paging. (This step is optional)
+
+         // Step 12. We don't need persistent messages in order to use paging. (This step is optional)
          messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
-         //Step 13. Send the message for about 30K, which should be over the memory limit imposed by the server
+         // Step 13. Send the message for about 30K, which should be over the memory limit imposed by the server
          for (int i = 0; i < 30000; i++)
          {
             messageProducer.send(message);
          }
 
          // Step 14. if you pause this example here, you will see several files under ./build/data/paging
-         // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created files just for 
-         
-         //Step 15. Create a JMS Message Consumer
+         // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created
+         // files just for
+
+         // Step 15. Create a JMS Message Consumer
          MessageConsumer messageConsumer = session.createConsumer(queue);
-         
 
-         //Step 16.  Start the JMS Connection. This step will activate the subscribers to receive messages.
+         // Step 16. Start the JMS Connection. This step will activate the subscribers to receive messages.
          connection.start();
-         
-         
-         //Step 17. Receive the messages. It's important to ACK for messages as JBM will not read messages from paging until messages are ACKed
-         
+
+         // Step 17. Receive the messages. It's important to ACK for messages as JBM will not read messages from paging
+         // until messages are ACKed
+
          for (int i = 0; i < 30000; i++)
          {
             message = (BytesMessage)messageConsumer.receive(3000);
@@ -128,10 +129,9 @@
                message.acknowledge();
             }
          }
-         
+
          message.acknowledge();
-         
-         
+
          // Step 18. Receive the messages from the Queue names pageQueue. Create the proper consumer for that
          messageConsumer.close();
          messageConsumer = session.createConsumer(pageQueue);
@@ -139,26 +139,26 @@
          for (int i = 0; i < 20; i++)
          {
             message = (BytesMessage)messageConsumer.receive(1000);
-            
+
             System.out.println("Received message " + i + " from pageQueue");
 
             message.acknowledge();
          }
-         
 
          return true;
 
       }
       finally
       {
-         // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
-         
+         // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS
+         // connection will automatically close all of its sessions, consumers, producer and browser objects
+
          if (initialContext != null)
          {
             initialContext.close();
          }
-         
-         if(connection != null)
+
+         if (connection != null)
          {
             connection.close();
          }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -254,7 +254,7 @@
    // Non transactional operations
 
    public void storeMessage(final ServerMessage message) throws Exception
-   {
+   {  
       if (message.getMessageID() <= 0)
       {
          throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -17,6 +17,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.DivertConfiguration;
 import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.deployers.Deployer;
 import org.jboss.messaging.core.deployers.DeploymentManager;
 import org.jboss.messaging.core.deployers.impl.AddressSettingsDeployer;
@@ -91,6 +92,8 @@
 import org.jboss.messaging.utils.VersionLoader;
 
 import javax.management.MBeanServer;
+
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -201,25 +204,43 @@
    // Constructors
    // ---------------------------------------------------------------------------------
 
+   public MessagingServerImpl()
+   {
+      this(null, null, null);
+   }
+   
+   public MessagingServerImpl(final Configuration configuration)
+   {
+      this(configuration, null, null);
+   }
+   
    public MessagingServerImpl(final Configuration configuration,
-                              final MBeanServer mbeanServer,
+                              MBeanServer mbeanServer)
+   {
+      this(configuration, mbeanServer, null);
+   }
+   
+   public MessagingServerImpl(final Configuration configuration,                              
                               final JBMSecurityManager securityManager)
    {
+      this(configuration, null, securityManager);
+   }
+   
+   public MessagingServerImpl(Configuration configuration,
+                              MBeanServer mbeanServer,
+                              final JBMSecurityManager securityManager)
+   {
       if (configuration == null)
       {
-         throw new NullPointerException("Must inject Configuration into MessagingServer constructor");
+         configuration = new ConfigurationImpl();
       }
 
       if (mbeanServer == null)
       {
-         throw new NullPointerException("Must inject MBeanServer into MessagingServer constructor");
+         //Just use JVM mbean server
+         mbeanServer = ManagementFactory.getPlatformMBeanServer();
       }
 
-      if (securityManager == null)
-      {
-         throw new NullPointerException("Must inject SecurityManager into MessagingServer constructor");
-      }
-
       // We need to hard code the version information into a source file
 
       version = VersionLoader.getVersion();
@@ -294,7 +315,10 @@
 
          queueDeployer.stop();
 
-         securityDeployer.stop();
+         if (securityDeployer != null)
+         {            
+            securityDeployer.stop();
+         }
 
          deploymentManager.stop();
       }
@@ -303,7 +327,10 @@
 
       storageManager.stop();
 
-      securityManager.stop();
+      if (securityManager != null)
+      {
+         securityManager.stop();
+      }
 
       if (replicatingConnection != null)
       {
@@ -962,7 +989,10 @@
 
       storageManager.start();
 
-      securityManager.start();
+      if (securityManager != null)
+      {
+         securityManager.start();
+      }
 
       postOffice.start();
 
@@ -977,11 +1007,14 @@
       {
          basicUserCredentialsDeployer = new BasicUserCredentialsDeployer(deploymentManager, securityManager);
 
-         securityDeployer = new SecurityDeployer(deploymentManager, securityRepository);
-
          basicUserCredentialsDeployer.start();
-
-         securityDeployer.start();
+                 
+         if (securityManager != null)
+         {
+            securityDeployer = new SecurityDeployer(deploymentManager, securityRepository);
+            
+            securityDeployer.start();
+         }        
       }
 
       // Load the journal and populate queues, transactions and caches in memory

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -298,6 +298,8 @@
       startServerChannels();
       
       paused = false;
+      
+      log.info("*** started netty acceptor on " + port);
    }
    
    private void startServerChannels()

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -233,6 +233,8 @@
 
    public synchronized void start()
    {
+      log.info("starting netty connector on port " + port);
+      
       if (channelFactory != null)
       {
          return;

Modified: trunk/src/main/org/jboss/messaging/utils/XMLUtil.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/XMLUtil.java	2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/utils/XMLUtil.java	2009-05-21 13:50:51 UTC (rev 6966)
@@ -497,8 +497,6 @@
    {
       SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
       
-      log.info("schema file is " + schemaFile);
-      
       Schema schema = factory.newSchema(Thread.currentThread().getContextClassLoader().getResource(schemaFile));
       Validator validator = schema.newValidator();
 




More information about the jboss-cvs-commits mailing list