[jboss-cvs] JBoss Messaging SVN: r4240 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/mina and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon May 19 12:53:33 EDT 2008
Author: timfox
Date: 2008-05-19 12:53:33 -0400 (Mon, 19 May 2008)
New Revision: 4240
Modified:
trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
Log:
some changes mainly to perf example
Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-19 16:06:02 UTC (rev 4239)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-19 16:53:33 UTC (rev 4240)
@@ -49,28 +49,27 @@
public static void main(String[] args)
{
PerfExample perfExample = new PerfExample();
+
+ int noOfMessages = Integer.parseInt(args[1]);
+ int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
+ long samplePeriod = Long.parseLong(args[3]);
+ boolean transacted = Boolean.parseBoolean(args[4]);
+ log.info("Transacted:" + transacted);
+ int transactionBatchSize = Integer.parseInt(args[5]);
+ PerfParams perfParams = new PerfParams();
+ perfParams.setNoOfMessagesToSend(noOfMessages);
+ perfParams.setDeliveryMode(deliveryMode);
+ perfParams.setSamplePeriod(samplePeriod);
+ perfParams.setSessionTransacted(transacted);
+ perfParams.setTransactionBatchSize(transactionBatchSize);
+
if (args[0].equalsIgnoreCase("-l"))
{
- int noOfMessages = Integer.parseInt(args[1]);
- perfExample.runListener(noOfMessages);
+ perfExample.runListener(perfParams);
}
else
{
- int noOfMessages = Integer.parseInt(args[1]);
- int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
- long samplePeriod = Long.parseLong(args[3]);
- boolean transacted = Boolean.parseBoolean(args[4]);
- if(transacted)
- {
- deliveryMode = DeliveryMode.PERSISTENT;
- }
- int transactionBatchSize = Integer.parseInt(args[5]);
- PerfParams perfParams = new PerfParams();
- perfParams.setNoOfMessagesToSend(noOfMessages);
- perfParams.setDeliveryMode(deliveryMode);
- perfParams.setSamplePeriod(samplePeriod);
- perfParams.setSessionTransacted(transacted);
- perfParams.setTransactionBatchSize(transactionBatchSize);
+
perfExample.runSender(perfParams);
}
@@ -83,7 +82,7 @@
queue = (Queue) initialContext.lookup("/queue/testPerfQueue");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
- session = connection.createSession(transacted, Session.DUPS_OK_ACKNOWLEDGE);
+ session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.DUPS_OK_ACKNOWLEDGE);
}
public void runSender(PerfParams perfParams)
@@ -143,12 +142,11 @@
}
}
- public void runListener(int numberOfMessages)
+ public void runListener(final PerfParams perfParams)
{
try
{
- System.out.println("Running listener for " + numberOfMessages);
- init(false);
+ init(perfParams.isSessionTransacted());
MessageConsumer messageConsumer = session.createConsumer(queue);
CountDownLatch countDownLatch = new CountDownLatch(1);
connection.start();
@@ -161,7 +159,7 @@
break;
}
}
- messageConsumer.setMessageListener(new PerfListener(countDownLatch, numberOfMessages));
+ messageConsumer.setMessageListener(new PerfListener(countDownLatch, perfParams));
log.info("READY!!!");
countDownLatch.await();
@@ -192,15 +190,15 @@
class PerfListener implements MessageListener
{
private CountDownLatch countDownLatch;
- int noOfMessages;
+ PerfParams perfParams;
boolean started = false;
- public PerfListener(CountDownLatch countDownLatch, int noOfMessages)
+ public PerfListener(CountDownLatch countDownLatch, PerfParams perfParams)
{
this.countDownLatch = countDownLatch;
- this.noOfMessages = noOfMessages;
+ this.perfParams = perfParams;
}
public void onMessage(Message message)
@@ -215,8 +213,13 @@
{
BytesMessage bm = (BytesMessage) message;
messageCount++;
- if (messageCount == noOfMessages)
+ boolean committed = checkCommit();
+ if (messageCount == perfParams.getNoOfMessagesToSend())
{
+ if (!committed)
+ {
+ checkCommit();
+ }
countDownLatch.countDown();
scheduler.shutdownNow();
log.info("average " + command.getAverage() + " per sec" );
@@ -227,6 +230,20 @@
e.printStackTrace();
}
}
+
+ private boolean checkCommit() throws Exception
+ {
+ if (perfParams.isSessionTransacted())
+ {
+ if (messageCount % perfParams.getTransactionBatchSize() == 0)
+ {
+ session.commit();
+
+ return true;
+ }
+ }
+ return false;
+ }
}
/**
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-19 16:06:02 UTC (rev 4239)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-19 16:53:33 UTC (rev 4240)
@@ -13,6 +13,7 @@
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-19 16:06:02 UTC (rev 4239)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-05-19 16:53:33 UTC (rev 4240)
@@ -7,6 +7,7 @@
package org.jboss.messaging.core.remoting.impl.mina;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.Packet;
More information about the jboss-cvs-commits
mailing list