[jboss-cvs] JBoss Messaging SVN: r4240 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/mina and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon May 19 12:53:33 EDT 2008


Author: timfox
Date: 2008-05-19 12:53:33 -0400 (Mon, 19 May 2008)
New Revision: 4240

Modified:
   trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
Log:
some changes mainly to perf example


Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-19 16:06:02 UTC (rev 4239)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-19 16:53:33 UTC (rev 4240)
@@ -49,28 +49,27 @@
    public static void main(String[] args)
    {
       PerfExample perfExample = new PerfExample();
+      
+      int noOfMessages = Integer.parseInt(args[1]);
+      int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
+      long samplePeriod = Long.parseLong(args[3]);
+      boolean transacted = Boolean.parseBoolean(args[4]);
+      log.info("Transacted:" + transacted);
+      int transactionBatchSize = Integer.parseInt(args[5]);
+      PerfParams perfParams = new PerfParams();
+      perfParams.setNoOfMessagesToSend(noOfMessages);
+      perfParams.setDeliveryMode(deliveryMode);
+      perfParams.setSamplePeriod(samplePeriod);
+      perfParams.setSessionTransacted(transacted);
+      perfParams.setTransactionBatchSize(transactionBatchSize);
+      
       if (args[0].equalsIgnoreCase("-l"))
       {
-         int noOfMessages = Integer.parseInt(args[1]);
-         perfExample.runListener(noOfMessages);
+         perfExample.runListener(perfParams);
       }
       else
       {
-         int noOfMessages = Integer.parseInt(args[1]);
-         int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
-         long samplePeriod = Long.parseLong(args[3]);
-         boolean transacted = Boolean.parseBoolean(args[4]);
-         if(transacted)
-         {
-            deliveryMode = DeliveryMode.PERSISTENT;
-         }
-         int transactionBatchSize = Integer.parseInt(args[5]);
-         PerfParams perfParams = new PerfParams();
-         perfParams.setNoOfMessagesToSend(noOfMessages);
-         perfParams.setDeliveryMode(deliveryMode);
-         perfParams.setSamplePeriod(samplePeriod);
-         perfParams.setSessionTransacted(transacted);
-         perfParams.setTransactionBatchSize(transactionBatchSize);
+         
          perfExample.runSender(perfParams);
       }
 
@@ -83,7 +82,7 @@
       queue = (Queue) initialContext.lookup("/queue/testPerfQueue");
       ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
       connection = cf.createConnection();
-      session = connection.createSession(transacted, Session.DUPS_OK_ACKNOWLEDGE);
+      session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.DUPS_OK_ACKNOWLEDGE);
    }
    
    public void runSender(PerfParams perfParams)
@@ -143,12 +142,11 @@
       }
    }
 
-   public void runListener(int numberOfMessages)
+   public void runListener(final PerfParams perfParams)
    {
       try
       {
-         System.out.println("Running listener for " + numberOfMessages);
-         init(false);
+         init(perfParams.isSessionTransacted());
          MessageConsumer messageConsumer = session.createConsumer(queue);
          CountDownLatch countDownLatch = new CountDownLatch(1);
          connection.start();
@@ -161,7 +159,7 @@
                break;
             }
          }
-         messageConsumer.setMessageListener(new PerfListener(countDownLatch, numberOfMessages));
+         messageConsumer.setMessageListener(new PerfListener(countDownLatch, perfParams));
          log.info("READY!!!");
          countDownLatch.await();
 
@@ -192,15 +190,15 @@
    class PerfListener implements MessageListener
    {
       private CountDownLatch countDownLatch;
-      int noOfMessages;
+      PerfParams perfParams;
       
       boolean started = false;
 
 
-      public PerfListener(CountDownLatch countDownLatch, int noOfMessages)
+      public PerfListener(CountDownLatch countDownLatch, PerfParams perfParams)
       {
          this.countDownLatch = countDownLatch;
-         this.noOfMessages = noOfMessages;
+         this.perfParams = perfParams;
       }
 
       public void onMessage(Message message)
@@ -215,8 +213,13 @@
          {
             BytesMessage bm = (BytesMessage) message;
             messageCount++;      
-            if (messageCount == noOfMessages)
+            boolean committed = checkCommit();
+            if (messageCount == perfParams.getNoOfMessagesToSend())
             {
+               if (!committed)
+               {
+                  checkCommit();
+               }
                countDownLatch.countDown();
                scheduler.shutdownNow();
                log.info("average " +  command.getAverage() + " per sec" );
@@ -227,6 +230,20 @@
             e.printStackTrace();
          }
        }
+      
+      private boolean checkCommit() throws Exception
+      {
+         if (perfParams.isSessionTransacted())
+         {
+            if (messageCount % perfParams.getTransactionBatchSize() == 0)
+            {
+               session.commit();
+               
+               return true;
+            }
+         }
+         return false;
+      }
    }
 
    /**

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-19 16:06:02 UTC (rev 4239)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-19 16:53:33 UTC (rev 4240)
@@ -13,6 +13,7 @@
 
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.Packet;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-19 16:06:02 UTC (rev 4239)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java	2008-05-19 16:53:33 UTC (rev 4240)
@@ -7,6 +7,7 @@
 package org.jboss.messaging.core.remoting.impl.mina;
 
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.Packet;




More information about the jboss-cvs-commits mailing list