[jboss-cvs] JBoss Messaging SVN: r6966 - in trunk: docs/user-manual/en and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 21 09:50:52 EDT 2009
Author: timfox
Date: 2009-05-21 09:50:51 -0400 (Thu, 21 May 2009)
New Revision: 6966
Added:
trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
Removed:
trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java
Modified:
trunk/.classpath
trunk/docs/user-manual/en/using-core.xml
trunk/examples/core/perf/
trunk/examples/core/perf/build.xml
trunk/examples/core/perf/perf.properties
trunk/examples/core/perf/server0/jbm-configuration.xml
trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java
trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java
trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java
trunk/examples/jms/paging/build.xml
trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/utils/XMLUtil.java
Log:
perf example
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/.classpath 2009-05-21 13:50:51 UTC (rev 6966)
@@ -15,6 +15,7 @@
<classpathentry kind="src" path="tests/joram-tests/src"/>
<classpathentry kind="src" path="tests/joram-tests/config"/>
<classpathentry kind="src" path="examples/core/embedded/src"/>
+ <classpathentry kind="src" path="examples/core/perf/src"/>
<classpathentry kind="src" path="examples/jms/application-layer-failover/src"/>
<classpathentry kind="src" path="examples/jms/automatic-failover/src"/>
<classpathentry kind="src" path="examples/jms/bridge/src"/>
Modified: trunk/docs/user-manual/en/using-core.xml
===================================================================
--- trunk/docs/user-manual/en/using-core.xml 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/docs/user-manual/en/using-core.xml 2009-05-21 13:50:51 UTC (rev 6966)
@@ -107,7 +107,7 @@
new TransportConfiguration(
InVMConnectorFactory.class.getName()), null);
-ClientSession session = factory.createSession();
+ClientSession session = nettyFactory.createSession();
session.createQueue("example", "example", true);
ClientProducer producer = session.createProducer("example");
Property changes on: trunk/examples/core/perf
___________________________________________________________________
Name: svn:ignore
+ build
data
logs
Modified: trunk/examples/core/perf/build.xml
===================================================================
--- trunk/examples/core/perf/build.xml 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/build.xml 2009-05-21 13:50:51 UTC (rev 6966)
@@ -39,7 +39,7 @@
<path location="server0"/>
</path>
- <target name="runSender">
+ <target name="runSender" depends="compile">
<java classname="org.jboss.core.example.PerfSender" fork="true" resultproperty="example-result">
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx512M"/>
@@ -50,8 +50,8 @@
</java>
</target>
- <target name="runListener">
- <java classname="org.jboss.core.example.PerfSender" fork="true" resultproperty="example-result">
+ <target name="runListener" depends="compile">
+ <java classname="org.jboss.core.example.PerfListener" fork="true" resultproperty="example-result">
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx512M"/>
<jvmarg value="-XX:+UseParallelGC"/>
Modified: trunk/examples/core/perf/perf.properties
===================================================================
--- trunk/examples/core/perf/perf.properties 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/perf.properties 2009-05-21 13:50:51 UTC (rev 6966)
@@ -1,9 +1,10 @@
-num-messages=20000
-num-warmup-messages=2000
-message-size=1400
+num-messages=1000000
+num-warmup-messages=50000
+message-size=0
durable=false
transacted=false
batch-size=1000
drain-queue=true
queue-name=perfQueue
-throttle-rate=20000
\ No newline at end of file
+throttle-rate=-1
+address=perfAddress
\ No newline at end of file
Modified: trunk/examples/core/perf/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/jbm-configuration.xml 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/server0/jbm-configuration.xml 2009-05-21 13:50:51 UTC (rev 6966)
@@ -6,10 +6,21 @@
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">
- <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="jbm.remoting.netty.tcpnodelay" value="false" type="Boolean"/>
+ <param key="jbm.remoting.netty.tcpsendbuffersize" value="1048576" type="Boolean"/>
+ <param key="jbm.remoting.netty.tcpreceivebuffersize" value="1048576" type="Boolean"/>
</acceptor>
</acceptors>
<security-enabled>false</security-enabled>
+
+ <persistence-enabled>false</persistence-enabled>
+
+ <queues>
+ <queue name="perfQueue">
+ <address>perfAddress</address>
+ </queue>
+ </queues>
</configuration>
Added: trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java (rev 0)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -0,0 +1,419 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.core.example;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
+import org.jboss.messaging.integration.transports.netty.TransportConstants;
+import org.jboss.messaging.utils.TokenBucketLimiter;
+import org.jboss.messaging.utils.TokenBucketLimiterImpl;
+
+/**
+ *
+ * A PerfBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public abstract class PerfBase
+{
+ private static final Logger log = Logger.getLogger(PerfSender.class.getName());
+
+ protected static PerfParams getParams() throws Exception
+ {
+ Properties props = null;
+
+ InputStream is = null;
+
+ try
+ {
+ is = new FileInputStream("perf.properties");
+
+ props = new Properties();
+
+ props.load(is);
+ }
+ finally
+ {
+ if (is != null)
+ {
+ is.close();
+ }
+ }
+
+ int noOfMessages = Integer.valueOf(props.getProperty("num-messages"));
+ int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));
+ int messageSize = Integer.valueOf(props.getProperty("message-size"));
+ boolean durable = Boolean.valueOf(props.getProperty("durable"));
+ boolean transacted = Boolean.valueOf(props.getProperty("transacted"));
+ int batchSize = Integer.valueOf(props.getProperty("batch-size"));
+ boolean drainQueue = Boolean.valueOf(props.getProperty("drain-queue"));
+ String queueName = props.getProperty("queue-name");
+ String address = props.getProperty("address");
+ int throttleRate = Integer.valueOf(props.getProperty("throttle-rate"));
+
+ log.info("num-messages: " + noOfMessages);
+ log.info("num-warmup-messages: " + noOfWarmupMessages);
+ log.info("message-size: " + messageSize);
+ log.info("durable: " + durable);
+ log.info("transacted: " + transacted);
+ log.info("batch-size: " + batchSize);
+ log.info("drain-queue: " + drainQueue);
+ log.info("address: " + address);
+ log.info("throttle-rate: " + throttleRate);
+
+ PerfParams perfParams = new PerfParams();
+ perfParams.setNoOfMessagesToSend(noOfMessages);
+ perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
+ perfParams.setMessageSize(messageSize);
+ perfParams.setDurable(durable);
+ perfParams.setSessionTransacted(transacted);
+ perfParams.setBatchSize(batchSize);
+ perfParams.setDrainQueue(drainQueue);
+ perfParams.setQueueName(queueName);
+ perfParams.setAddress(address);
+ perfParams.setThrottleRate(throttleRate);
+
+ return perfParams;
+ }
+
+ private final PerfParams perfParams;
+
+ protected PerfBase(final PerfParams perfParams)
+ {
+ this.perfParams = perfParams;
+ }
+
+ private ClientSessionFactory factory;
+
+ private ClientSession session;
+
+ private long start;
+
+ private void init(final boolean transacted, final String queueName) throws Exception
+ {
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ params.put(TransportConstants.TCP_NODELAY_PROPNAME, false);
+ params.put(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, 1024 * 1024);
+ params.put(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, 1024 * 1024);
+
+ factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName(), params));
+
+ factory.setAckBatchSize(perfParams.getBatchSize());
+
+ session = factory.createSession(!transacted, !transacted);
+ }
+
+ private void displayAverage(final long numberOfMessages, final long start, final long end)
+ {
+ double duration = (1.0 * end - start) / 1000; // in seconds
+ double average = (1.0 * numberOfMessages / duration);
+ log.info(String.format("average: %.2f msg/s (%d messages in %2.2fs)", average, numberOfMessages, duration));
+ }
+
+ protected void runSender()
+ {
+ try
+ {
+ log.info("params = " + perfParams);
+ init(perfParams.isSessionTransacted(), perfParams.getQueueName());
+
+ if (perfParams.isDrainQueue())
+ {
+ drainQueue();
+ }
+
+ start = System.currentTimeMillis();
+ log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
+ sendMessages(perfParams.getNoOfWarmupMessages(),
+ perfParams.getBatchSize(),
+ perfParams.isDurable(),
+ perfParams.isSessionTransacted(),
+ false,
+ perfParams.getThrottleRate(),
+ perfParams.getMessageSize());
+ log.info("warmed up");
+ start = System.currentTimeMillis();
+ sendMessages(perfParams.getNoOfMessagesToSend(),
+ perfParams.getBatchSize(),
+ perfParams.isDurable(),
+ perfParams.isSessionTransacted(),
+ true,
+ perfParams.getThrottleRate(),
+ perfParams.getMessageSize());
+ long end = System.currentTimeMillis();
+ displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ protected void runListener()
+ {
+ try
+ {
+
+ init(perfParams.isSessionTransacted(), perfParams.getQueueName());
+
+ if (perfParams.isDrainQueue())
+ {
+ drainQueue();
+ }
+
+ ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
+
+ session.start();
+
+ log.info("READY!!!");
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ consumer.setMessageHandler(new PerfListener(countDownLatch, perfParams));
+ countDownLatch.await();
+ long end = System.currentTimeMillis();
+ // start was set on the first received message
+ displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void drainQueue() throws Exception
+ {
+ log.info("Draining queue");
+ ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
+
+ session.start();
+
+ ClientMessage message = null;
+
+ int count = 0;
+ do
+ {
+ message = consumer.receive(3000);
+
+ if (message != null)
+ {
+ message.acknowledge();
+
+ count++;
+ }
+ }
+ while (message != null);
+
+ consumer.close();
+
+ log.info("Drained " + count + " messages");
+ }
+
+ private void sendMessages(final int numberOfMessages,
+ final int txBatchSize,
+ final boolean durable,
+ final boolean transacted,
+ final boolean display,
+ final int throttleRate,
+ final int messageSize) throws Exception
+ {
+ ClientProducer producer = session.createProducer(perfParams.getAddress());
+
+ ClientMessage message = session.createClientMessage(durable);
+
+ byte[] payload = new byte[messageSize];
+
+ message.getBody().writeBytes(payload);
+
+ final int modulo = 2000;
+
+ TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false) : null;
+
+ boolean committed = false;
+ for (int i = 1; i <= numberOfMessages; i++)
+ {
+ producer.send(message);
+
+ if (transacted)
+ {
+ if (i % txBatchSize == 0)
+ {
+ session.commit();
+ committed = true;
+ }
+ else
+ {
+ committed = false;
+ }
+ }
+ if (display && (i % modulo == 0))
+ {
+ double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+ log.info(String.format("sent %6d messages in %2.2fs", i, duration));
+ }
+
+ if (tbl != null)
+ {
+ tbl.limit();
+ }
+ }
+ if (transacted && !committed)
+ {
+ session.commit();
+ }
+ }
+
+ private class PerfListener implements MessageHandler
+ {
+ private final CountDownLatch countDownLatch;
+
+ private final PerfParams perfParams;
+
+ private boolean warmingUp = true;
+
+ private boolean started = false;
+
+ private final int modulo;
+
+ private final AtomicLong count = new AtomicLong(0);
+
+ public PerfListener(final CountDownLatch countDownLatch, final PerfParams perfParams)
+ {
+ this.countDownLatch = countDownLatch;
+ this.perfParams = perfParams;
+ warmingUp = perfParams.getNoOfWarmupMessages() > 0;
+ this.modulo = 2000;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ if (warmingUp)
+ {
+ boolean committed = checkCommit();
+ if (count.incrementAndGet() == perfParams.getNoOfWarmupMessages())
+ {
+ log.info("warmed up after receiving " + count.longValue() + " msgs");
+ if (!committed)
+ {
+ checkCommit();
+ }
+ warmingUp = false;
+ }
+ return;
+ }
+
+ if (!started)
+ {
+ started = true;
+ // reset count to take stats
+ count.set(0);
+ start = System.currentTimeMillis();
+ }
+
+ message.acknowledge();
+
+ long currentCount = count.incrementAndGet();
+ boolean committed = checkCommit();
+ if (currentCount == perfParams.getNoOfMessagesToSend())
+ {
+ if (!committed)
+ {
+ checkCommit();
+ }
+ countDownLatch.countDown();
+ }
+ if (currentCount % modulo == 0)
+ {
+ double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+ log.info(String.format("received %6d messages in %2.2fs", currentCount, duration));
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private boolean checkCommit() throws Exception
+ {
+ if (perfParams.isSessionTransacted())
+ {
+ if (count.longValue() % perfParams.getBatchSize() == 0)
+ {
+ session.commit();
+
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+}
Deleted: trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfExample.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -1,103 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.core.example;
-
-import org.jboss.common.example.JBMExample;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
-
-/**
- *
- * A PerfExample
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class PerfExample extends JBMExample
-{
- public static void main(String[] args)
- {
- new PerfExample().run(args);
- }
-
- public boolean runExample() throws Exception
- {
- final String perfAddress = "perfAddress";
-
- final String perfQueueName = "perfQueue";
-
- ClientSessionFactory factory = null;
-
- ClientSession session = null;
-
- try
- {
- factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName()));
-
- session = factory.createSession();
-
- session.createQueue(perfAddress, perfQueueName, true);
-
- ClientProducer producer = session.createProducer(perfAddress);
-
- ClientMessage message = session.createClientMessage(true);
-
- message.getBody().writeString("Hello");
-
- producer.send(message);
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(perfQueueName);
-
- ClientMessage msgReceived = consumer.receive();
-
- System.out.println("message = " + msgReceived.getBody().readString());
-
- consumer.close();
-
- session.deleteQueue(perfQueueName);
-
- return true;
- }
- finally
- {
- if (session != null)
- {
- session.close();
- }
-
- if (factory != null)
- {
- factory.close();
- }
- }
- }
-
-}
Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfListener.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -1,54 +1,62 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.core.example;
+import java.util.logging.Logger;
-package org.jboss.core.example;
-
/**
+ *
* A PerfListener
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*
*/
-public class PerfListener
+public class PerfListener extends PerfBase
{
+ private static final Logger log = Logger.getLogger(PerfListener.class.getName());
- // Constants -----------------------------------------------------
+ public static void main(String[] args)
+ {
+ try
+ {
+ PerfParams params = getParams();
- // Attributes ----------------------------------------------------
+ new PerfListener(params).run();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
- // Static --------------------------------------------------------
+ private PerfListener(final PerfParams perfParams)
+ {
+ super(perfParams);
+ }
- // Constructors --------------------------------------------------
+ public void run() throws Exception
+ {
+ runListener();
+ }
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfParams.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -41,6 +41,7 @@
private int batchSize = 5000;
private boolean drainQueue = true;
private String queueName = "perfQueue";
+ private String address = "perfAddress";
private int throttleRate;
public int getNoOfMessagesToSend()
@@ -122,7 +123,16 @@
{
this.queueName = queueName;
}
+
+ public String getAddress()
+ {
+ return address;
+ }
+ public void setAddress(final String address)
+ {
+ this.address = address;
+ }
public int getThrottleRate()
{
Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfSender.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -21,25 +21,8 @@
*/
package org.jboss.core.example;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
+import java.util.logging.Logger;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-
-import org.jboss.common.example.JBMExample;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
-import org.jboss.messaging.utils.TokenBucketLimiter;
-import org.jboss.messaging.utils.TokenBucketLimiterImpl;
-
/**
*
* A PerfSender
@@ -48,268 +31,32 @@
*
*
*/
-public class PerfSender extends JBMExample
+public class PerfSender extends PerfBase
{
+ private static final Logger log = Logger.getLogger(PerfSender.class.getName());
+
public static void main(String[] args)
- {
+ {
try
{
- Properties props = null;
-
- InputStream is = null;
-
- try
- {
- is = new FileInputStream("perf.properties");
+ PerfParams params = getParams();
- props = new Properties();
-
- props.load(is);
- }
- finally
- {
- if (is != null)
- {
- is.close();
- }
- }
-
- int noOfMessages = Integer.valueOf(props.getProperty("num-messages"));
- int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages"));
- int messageSize = Integer.valueOf(props.getProperty("message-size"));
- boolean durable = Boolean.valueOf(props.getProperty("durable"));
- boolean transacted = Boolean.valueOf(props.getProperty("transacted"));
- int batchSize = Integer.valueOf(props.getProperty("batch-size"));
- boolean drainQueue = Boolean.valueOf(props.getProperty("drain-queue"));
- String queueName = props.getProperty("queue-name");
- int throttleRate = Integer.valueOf(props.getProperty("throttle-rate"));
-
- PerfParams perfParams = new PerfParams();
- perfParams.setNoOfMessagesToSend(noOfMessages);
- perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
- perfParams.setMessageSize(messageSize);
- perfParams.setDurable(durable);
- perfParams.setSessionTransacted(transacted);
- perfParams.setBatchSize(batchSize);
- perfParams.setDrainQueue(drainQueue);
- perfParams.setQueueName(queueName);
- perfParams.setThrottleRate(throttleRate);
-
- new PerfSender(perfParams).run(args);
- }
+ new PerfSender(params).run();
+ }
catch (Exception e)
{
e.printStackTrace();
}
}
- private final PerfParams perfParams;
-
private PerfSender(final PerfParams perfParams)
{
- super();
-
- this.perfParams = perfParams;
+ super(perfParams);
}
- private ClientSessionFactory factory;
-
- private ClientSession session;
-
- private long start;
-
- public boolean runExample() throws Exception
+ public void run() throws Exception
{
- runSender(perfParams);
-
- return true;
+ runSender();
}
- private void init(final boolean transacted, final String queueName) throws Exception
- {
- factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName()));
-
- factory.setAckBatchSize(perfParams.getBatchSize());
-
- session = factory.createSession(!transacted, !transacted);
-
- session.createQueue(perfParams.getQueueName(), perfParams.getQueueName(), perfParams.isDurable());
- }
-
- private void displayAverage(final long numberOfMessages, final long start, final long end)
- {
- double duration = (1.0 * end - start) / 1000; // in seconds
- double average = (1.0 * numberOfMessages / duration);
- log.info(String.format("average: %.2f msg/s (%d messages in %2.2fs)", average, numberOfMessages, duration));
- }
-
- private void runSender(final PerfParams perfParams)
- {
- try
- {
- log.info("params = " + perfParams);
- init(perfParams.isSessionTransacted(), perfParams.getQueueName());
- start = System.currentTimeMillis();
- log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
- sendMessages(perfParams.getNoOfWarmupMessages(),
- perfParams.getBatchSize(),
- perfParams.isDurable(),
- perfParams.isSessionTransacted(),
- false,
- perfParams.getThrottleRate(),
- perfParams.getMessageSize());
- log.info("warmed up");
- start = System.currentTimeMillis();
- sendMessages(perfParams.getNoOfMessagesToSend(),
- perfParams.getBatchSize(),
- perfParams.isDurable(),
- perfParams.isSessionTransacted(),
- true,
- perfParams.getThrottleRate(),
- perfParams.getMessageSize());
- long end = System.currentTimeMillis();
- displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- finally
- {
- if (session != null)
- {
- try
- {
- session.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- }
-
- private void sendMessages(final int numberOfMessages,
- final int txBatchSize,
- final boolean durable,
- final boolean transacted,
- final boolean display,
- final int throttleRate,
- final int messageSize) throws Exception
- {
- ClientProducer producer = session.createProducer(perfParams.getQueueName());
-
- ClientMessage message = session.createClientMessage(durable);
-
- byte[] payload = new byte[messageSize];
-
- message.getBody().writeBytes(payload);
-
- final int modulo = 2000;
-
- TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false) : null;
-
- boolean committed = false;
- for (int i = 1; i <= numberOfMessages; i++)
- {
- producer.send(message);
-
- if (transacted)
- {
- if (i % txBatchSize == 0)
- {
- session.commit();
- committed = true;
- }
- else
- {
- committed = false;
- }
- }
- if (display && (i % modulo == 0))
- {
- double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
- log.info(String.format("sent %6d messages in %2.2fs", i, duration));
- }
-
- if (tbl != null)
- {
- tbl.limit();
- }
- }
- if (transacted && !committed)
- {
- session.commit();
- }
- }
-
- private void drainQueue(final MessageConsumer consumer) throws JMSException
- {
- log.info("draining queue");
- while (true)
- {
- Message m = consumer.receive(5000);
- if (m == null)
- {
- log.info("queue is drained");
- break;
- }
- }
- }
-
- // public boolean runExample() throws Exception
- // {
- // final String perfAddress = "perfAddress";
- //
- // final String perfQueueName = "perfQueue";
- //
- // ClientSessionFactory factory = null;
- //
- // ClientSession session = null;
- //
- // try
- // {
- // factory = new ClientSessionFactoryImpl(new TransportConfiguration(NettyConnectorFactory.class.getName()));
- //
- // session = factory.createSession();
- //
- // session.createQueue(perfAddress, perfQueueName, true);
- //
- // ClientProducer producer = session.createProducer(perfAddress);
- //
- // ClientMessage message = session.createClientMessage(true);
- //
- // message.getBody().writeString("Hello");
- //
- // producer.send(message);
- //
- // session.start();
- //
- // ClientConsumer consumer = session.createConsumer(perfQueueName);
- //
- // ClientMessage msgReceived = consumer.receive();
- //
- // System.out.println("message = " + msgReceived.getBody().readString());
- //
- // consumer.close();
- //
- // session.deleteQueue(perfQueueName);
- //
- // return true;
- // }
- // finally
- // {
- // if (session != null)
- // {
- // session.close();
- // }
- //
- // if (factory != null)
- // {
- // factory.close();
- // }
- // }
- // }
-
}
Modified: trunk/examples/jms/paging/build.xml
===================================================================
--- trunk/examples/jms/paging/build.xml 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/jms/paging/build.xml 2009-05-21 13:50:51 UTC (rev 6966)
@@ -36,7 +36,7 @@
<antcall target="runExample">
<param name="example.classname" value="org.jboss.jms.example.PagingExample"/>
- <!-- We limit the client to running in only 50MB of RAM -->
+ <!-- We limit the client to running in only 50MB of RAM -->
<param name="java-min-memory" value="50M"/>
<param name="java-max-memory" value="50M"/>
</antcall>
Modified: trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java
===================================================================
--- trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/examples/jms/paging/src/org/jboss/jms/example/PagingExample.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -48,76 +48,77 @@
"-XX:+AggressiveOpts",
"-XX:+UseFastAccessorMethods" };
- new PagingExample().run(serverArgs, args);
+ new PagingExample().run(serverArgs, args);
}
public boolean runExample() throws Exception
{
Connection connection = null;
-
+
InitialContext initialContext = null;
try
{
- //Step 1. Create an initial context to perform the JNDI lookup.
+ // Step 1. Create an initial context to perform the JNDI lookup.
initialContext = getContext(0);
- //Step 2. Perform a lookup on the Connection Factory
- ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+ // Step 2. Perform a lookup on the Connection Factory
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number of bytes in memory
- Queue pageQueue = (Queue) initialContext.lookup("/queue/pagingQueue");
-
- // Step 4. Lookup for a JMS Queue
- Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
+ // Step 3. We look-up the JMS queue object from JNDI. pagingQueue is configured to hold a very limited number
+ // of bytes in memory
+ Queue pageQueue = (Queue)initialContext.lookup("/queue/pagingQueue");
+ // Step 4. Lookup for a JMS Queue
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+
// Step 5. Create a JMS Connection
connection = cf.createConnection();
-
- //Step 6. Create a JMS Session
+
+ // Step 6. Create a JMS Session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- //Step 7. Create a JMS Message Producer for pageQueueAddress
+ // Step 7. Create a JMS Message Producer for pageQueueAddress
MessageProducer pageMessageProducer = session.createProducer(pageQueue);
-
- //Step 8. We don't need persistent messages in order to use paging. (This step is optional)
+
+ // Step 8. We don't need persistent messages in order to use paging. (This step is optional)
pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- //Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
+
+ // Step 9. Create a Binary Bytes Message with 10K arbitrary bytes
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[10 * 1024]);
-
- //Step 10. Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at ./paging/config/jbm-queues.xml for the config.
+ // Step 10. Send only 20 messages to the Queue. This will be already enough for pagingQueue. Look at
+ // ./paging/config/jbm-queues.xml for the config.
for (int i = 0; i < 20; i++)
{
pageMessageProducer.send(message);
- }
-
- //Step 11. Create a JMS Message Producer
+ }
+
+ // Step 11. Create a JMS Message Producer
MessageProducer messageProducer = session.createProducer(queue);
-
- //Step 12. We don't need persistent messages in order to use paging. (This step is optional)
+
+ // Step 12. We don't need persistent messages in order to use paging. (This step is optional)
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- //Step 13. Send the message for about 30K, which should be over the memory limit imposed by the server
+ // Step 13. Send the message for about 30K, which should be over the memory limit imposed by the server
for (int i = 0; i < 30000; i++)
{
messageProducer.send(message);
}
// Step 14. if you pause this example here, you will see several files under ./build/data/paging
- // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created files just for
-
- //Step 15. Create a JMS Message Consumer
+ // Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created
+ // files just for
+
+ // Step 15. Create a JMS Message Consumer
MessageConsumer messageConsumer = session.createConsumer(queue);
-
- //Step 16. Start the JMS Connection. This step will activate the subscribers to receive messages.
+ // Step 16. Start the JMS Connection. This step will activate the subscribers to receive messages.
connection.start();
-
-
- //Step 17. Receive the messages. It's important to ACK for messages as JBM will not read messages from paging until messages are ACKed
-
+
+ // Step 17. Receive the messages. It's important to ACK for messages as JBM will not read messages from paging
+ // until messages are ACKed
+
for (int i = 0; i < 30000; i++)
{
message = (BytesMessage)messageConsumer.receive(3000);
@@ -128,10 +129,9 @@
message.acknowledge();
}
}
-
+
message.acknowledge();
-
-
+
// Step 18. Receive the messages from the Queue names pageQueue. Create the proper consumer for that
messageConsumer.close();
messageConsumer = session.createConsumer(pageQueue);
@@ -139,26 +139,26 @@
for (int i = 0; i < 20; i++)
{
message = (BytesMessage)messageConsumer.receive(1000);
-
+
System.out.println("Received message " + i + " from pageQueue");
message.acknowledge();
}
-
return true;
}
finally
{
- // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
-
+ // And finally, always remember to close your JMS connections after use, in a finally block. Closing a JMS
+ // connection will automatically close all of its sessions, consumers, producer and browser objects
+
if (initialContext != null)
{
initialContext.close();
}
-
- if(connection != null)
+
+ if (connection != null)
{
connection.close();
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -254,7 +254,7 @@
// Non transactional operations
public void storeMessage(final ServerMessage message) throws Exception
- {
+ {
if (message.getMessageID() <= 0)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -17,6 +17,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.DivertConfiguration;
import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.deployers.Deployer;
import org.jboss.messaging.core.deployers.DeploymentManager;
import org.jboss.messaging.core.deployers.impl.AddressSettingsDeployer;
@@ -91,6 +92,8 @@
import org.jboss.messaging.utils.VersionLoader;
import javax.management.MBeanServer;
+
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -201,25 +204,43 @@
// Constructors
// ---------------------------------------------------------------------------------
+ public MessagingServerImpl()
+ {
+ this(null, null, null);
+ }
+
+ public MessagingServerImpl(final Configuration configuration)
+ {
+ this(configuration, null, null);
+ }
+
public MessagingServerImpl(final Configuration configuration,
- final MBeanServer mbeanServer,
+ MBeanServer mbeanServer)
+ {
+ this(configuration, mbeanServer, null);
+ }
+
+ public MessagingServerImpl(final Configuration configuration,
final JBMSecurityManager securityManager)
{
+ this(configuration, null, securityManager);
+ }
+
+ public MessagingServerImpl(Configuration configuration,
+ MBeanServer mbeanServer,
+ final JBMSecurityManager securityManager)
+ {
if (configuration == null)
{
- throw new NullPointerException("Must inject Configuration into MessagingServer constructor");
+ configuration = new ConfigurationImpl();
}
if (mbeanServer == null)
{
- throw new NullPointerException("Must inject MBeanServer into MessagingServer constructor");
+ //Just use JVM mbean server
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
}
- if (securityManager == null)
- {
- throw new NullPointerException("Must inject SecurityManager into MessagingServer constructor");
- }
-
// We need to hard code the version information into a source file
version = VersionLoader.getVersion();
@@ -294,7 +315,10 @@
queueDeployer.stop();
- securityDeployer.stop();
+ if (securityDeployer != null)
+ {
+ securityDeployer.stop();
+ }
deploymentManager.stop();
}
@@ -303,7 +327,10 @@
storageManager.stop();
- securityManager.stop();
+ if (securityManager != null)
+ {
+ securityManager.stop();
+ }
if (replicatingConnection != null)
{
@@ -962,7 +989,10 @@
storageManager.start();
- securityManager.start();
+ if (securityManager != null)
+ {
+ securityManager.start();
+ }
postOffice.start();
@@ -977,11 +1007,14 @@
{
basicUserCredentialsDeployer = new BasicUserCredentialsDeployer(deploymentManager, securityManager);
- securityDeployer = new SecurityDeployer(deploymentManager, securityRepository);
-
basicUserCredentialsDeployer.start();
-
- securityDeployer.start();
+
+ if (securityManager != null)
+ {
+ securityDeployer = new SecurityDeployer(deploymentManager, securityRepository);
+
+ securityDeployer.start();
+ }
}
// Load the journal and populate queues, transactions and caches in memory
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -298,6 +298,8 @@
startServerChannels();
paused = false;
+
+ log.info("*** started netty acceptor on " + port);
}
private void startServerChannels()
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -233,6 +233,8 @@
public synchronized void start()
{
+ log.info("starting netty connector on port " + port);
+
if (channelFactory != null)
{
return;
Modified: trunk/src/main/org/jboss/messaging/utils/XMLUtil.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/XMLUtil.java 2009-05-21 13:18:47 UTC (rev 6965)
+++ trunk/src/main/org/jboss/messaging/utils/XMLUtil.java 2009-05-21 13:50:51 UTC (rev 6966)
@@ -497,8 +497,6 @@
{
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
- log.info("schema file is " + schemaFile);
-
Schema schema = factory.newSchema(Thread.currentThread().getContextClassLoader().getResource(schemaFile));
Validator validator = schema.newValidator();
More information about the jboss-cvs-commits
mailing list