[jboss-cvs] JBoss Messaging SVN: r4198 - in trunk: src/main/org/jboss/messaging/util and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 14 15:00:38 EDT 2008


Author: timfox
Date: 2008-05-14 15:00:38 -0400 (Wed, 14 May 2008)
New Revision: 4198

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/util/SimpleString.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Performance tweak


Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-14 19:00:38 UTC (rev 4198)
@@ -99,7 +99,7 @@
    private AtomicInteger deliveringCount = new AtomicInteger(0);
 
    private volatile FlowController flowController;
-
+   
    public QueueImpl(final long persistenceID, final SimpleString name,
          final Filter filter, final boolean clustered, final boolean durable,
          final boolean temporary, final int maxSize,
@@ -475,121 +475,112 @@
 
    private HandleStatus add(final MessageReference ref, final boolean first)
    {
-      if (!checkFull()) { return HandleStatus.BUSY; }
-
+      if (maxSize != -1)
+      {
+         int size = deliveringCount.get() + messageReferences.size() + scheduledRunnables.size();
+         
+         if (size >= maxSize)
+         {
+            return HandleStatus.BUSY;
+         }      
+      }
+      
       if (!first)
       {
          messagesAdded.incrementAndGet();
       }
-
-      if (!checkAndSchedule(ref))
+      
+      if (checkAndSchedule(ref))
       {
-         boolean add = false;
+         return HandleStatus.HANDLED;         
+      }
 
-         if (direct)
-         {
-            // Deliver directly
+      boolean add = false;
 
-            HandleStatus status = deliver(ref);
+      if (direct)
+      {
+         // Deliver directly
 
-            if (status == HandleStatus.HANDLED)
-            {
-               // Ok
-            }
-            else if (status == HandleStatus.BUSY)
-            {
-               add = true;
-            }
-            else if (status == HandleStatus.NO_MATCH)
-            {
-               add = true;
-            }
+         HandleStatus status = deliver(ref);
 
-            if (add)
-            {
-               direct = false;
-            }
+         if (status == HandleStatus.HANDLED)
+         {
+            // Ok
          }
-         else
+         else if (status == HandleStatus.BUSY)
          {
             add = true;
          }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            add = true;
+         }
 
          if (add)
          {
-            if (first)
-            {
-               messageReferences.addFirst(ref, ref.getMessage().getPriority());
-            }
-            else
-            {
-               messageReferences.addLast(ref, ref.getMessage().getPriority());
-            }
-
-            if (!direct && promptDelivery)
-            {
-               // We have consumers with filters which don't match, so we need
-               // to prompt delivery every time
-               // a new message arrives - this is why you really shouldn't use
-               // filters with queues - in most cases
-               // it's an ant-pattern since it would cause a queue scan on each
-               // message
-               deliver();
-            }
+            direct = false;
          }
       }
+      else
+      {
+         add = true;
+      }
 
-      return HandleStatus.HANDLED;
-   }
-
-   private boolean checkAndSchedule(final MessageReference ref)
-   {
-      long now = System.currentTimeMillis();
-
-      if (scheduledExecutor != null && ref.getScheduledDeliveryTime() > now)
+      if (add)
       {
-         if (trace)
+         if (first)
          {
-            log.trace("Scheduling delivery for " + ref + " to occur at "
-                  + ref.getScheduledDeliveryTime());
+       //     messageReferences.addFirst(ref, ref.getMessage().getPriority());
          }
+         else
+         {
+      //      messageReferences.addLast(ref, ref.getMessage().getPriority());
+         }
 
-         long delay = ref.getScheduledDeliveryTime() - now;
+         if (!direct && promptDelivery)
+         {
+            // We have consumers with filters which don't match, so we need
+            // to prompt delivery every time
+            // a new message arrives - this is why you really shouldn't use
+            // filters with queues - in most cases
+            // it's an ant-pattern since it would cause a queue scan on each
+            // message
+            deliver();
+         }
+      }     
 
-         ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
-
-         scheduledRunnables.add(runnable);
-
-         Future<?> future = scheduledExecutor.schedule(runnable, delay,
-               TimeUnit.MILLISECONDS);
-
-         runnable.setFuture(future);
-
-         return true;
-      }
-      else
-      {
-         return false;
-      }
+      return HandleStatus.HANDLED;
    }
 
-   private boolean checkFull()
+   private boolean checkAndSchedule(final MessageReference ref)
    {
-      if (maxSize != -1
-            && (deliveringCount.get() + messageReferences.size() + scheduledRunnables
-                  .size()) >= maxSize)
-      {
-         if (trace)
+      long deliveryTime = ref.getScheduledDeliveryTime();
+      
+      if (deliveryTime != 0 && scheduledExecutor != null)
+      {      
+         long now = System.currentTimeMillis();
+      
+         if (deliveryTime > now)
          {
-            log.trace(this + " queue is full, rejecting message");
+            if (trace)
+            {
+               log.trace("Scheduling delivery for " + ref + " to occur at "  + deliveryTime);
+            }
+   
+            long delay = deliveryTime - now;
+   
+            ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+   
+            scheduledRunnables.add(runnable);
+   
+            Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+   
+            runnable.setFuture(future);
+   
+            return true;
          }
-
-         return false;
       }
-      else
-      {
-         return true;
-      }
+      return false;      
    }
 
    private HandleStatus deliver(final MessageReference reference)

Modified: trunk/src/main/org/jboss/messaging/util/SimpleString.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleString.java	2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/src/main/org/jboss/messaging/util/SimpleString.java	2008-05-14 19:00:38 UTC (rev 4198)
@@ -6,7 +6,6 @@
  */
 package org.jboss.messaging.util;
 
-import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
 import static org.jboss.messaging.util.DataConstants.SIZE_INT;
 
 import java.io.Serializable;

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageProducerTest.java	2008-05-14 19:00:38 UTC (rev 4198)
@@ -22,15 +22,12 @@
 package org.jboss.test.messaging.jms;
 
 import java.io.Serializable;
-import java.util.concurrent.CountDownLatch;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -55,22 +52,6 @@
 
    // Constructors --------------------------------------------------
 
-   public static void main(String[] args)
-   {
-      try
-      {
-         MessageProducerTest test = new MessageProducerTest();
-         
-         test.setUp();
-         test.testSpeed3();
-         test.tearDown();
-      }
-      catch (Throwable t)
-      {
-         t.printStackTrace();
-      }
-   }
-   
    public MessageProducerTest(String name)
    {
       super(name);
@@ -137,8 +118,6 @@
          pconn = cf.createConnection();
          cconn = cf.createConnection();
          
-         log.info("** created connections");
-
          Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session cs = cconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer p = ps.createProducer(queue1);
@@ -169,209 +148,6 @@
       }
    }
 
-   public void testSpeed() throws Exception
-   {
-      Connection pconn = null;      
-
-      try
-      {
-         pconn = cf.createConnection();
-
-         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
-         MessageProducer p = ps.createProducer(queue1);
-             
-         pconn.start();
-         
-         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         
-         p.setDisableMessageID(true);
-         p.setDisableMessageTimestamp(true);
-
-         final int numMessages = 100000;
-
-         long start = System.currentTimeMillis();
-
-         BytesMessage msg = ps.createBytesMessage();
-         
-         msg.writeBytes(new byte[200]);
-                           
-         for (int i = 0; i < numMessages; i++)
-         {
-            p.send(msg);
-         }
-         
-         long end = System.currentTimeMillis();
-
-         double actualRate = 1000 * (double)numMessages / ( end - start);
-
-         log.info("rate " + actualRate + " msgs /sec");
-
-      }
-      finally
-      {
-         pconn.close();
-      }
-   }
-   
-   public void testSpeed2() throws Exception
-   {
-      Connection pconn = null;      
-
-      try
-      {
-         pconn = cf.createConnection();
-
-         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
-         MessageProducer p = ps.createProducer(queue1);
-         
-         MessageConsumer cons = ps.createConsumer(queue1);
-         
-         pconn.start();
-         
-         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         
-         p.setDisableMessageID(true);
-         p.setDisableMessageTimestamp(true);
-
-         final int numMessages = 10000;
-
-         long start = System.currentTimeMillis();
-
-         BytesMessage msg = ps.createBytesMessage();
-         
-         msg.writeBytes(new byte[1000]);
-         
-         final CountDownLatch latch = new CountDownLatch(1);
-         
-         class MyListener implements MessageListener
-         {
-            int count;
-
-            public void onMessage(Message msg)
-            {
-               count++;
-               
-               if (count == numMessages)
-               {
-                  latch.countDown();
-               }
-            }            
-         }
-         
-         cons.setMessageListener(new MyListener());
-         
-         for (int i = 0; i < numMessages; i++)
-         {
-            p.send(msg);
-         }
-         
-         latch.await();
-         
-         long end = System.currentTimeMillis();
-
-         double actualRate = 1000 * (double)numMessages / ( end - start);
-
-         log.info("rate " + actualRate + " msgs /sec");
-
-      }
-      finally
-      {
-         pconn.close();
-      }
-   }
-   
-   public MessageProducerTest()
-   {
-      super("MessageProducerTest");
-   }
-   
-
-   
-   public void testSpeed3() throws Exception
-   {
-      Connection pconn = null;      
-
-      try
-      {
-         pconn = cf.createConnection();
-
-         Session ps = pconn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
-         MessageProducer p = ps.createProducer(queue1);
-         
-         MessageConsumer cons = ps.createConsumer(queue1);
-             
-         p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         
-         p.setDisableMessageID(true);
-         p.setDisableMessageTimestamp(true);
-
-         final int numMessages = 100000;
-         
-         BytesMessage msg = ps.createBytesMessage();
-         
-         msg.writeBytes(new byte[200]);
-         
-         final CountDownLatch latch = new CountDownLatch(1);
-         
-         class MyListener implements MessageListener
-         {
-            int count;
-
-            public void onMessage(Message msg)
-            {
-               count++;
-               
-               if (count == numMessages)
-               {
-                  latch.countDown();
-               }
-            }            
-         }
-         
-         cons.setMessageListener(new MyListener());
-         
-         long start = System.currentTimeMillis();
-         
-         
-         for (int i = 0; i < numMessages; i++)
-         {
-            p.send(msg);
-         }
-         
-         
-         long end = System.currentTimeMillis();
-
-         double actualRate = 1000 * (double)numMessages / ( end - start);
-         
-         log.info("send rate " + actualRate + " msgs /sec");
-
-         log.info("Sleeping");
-         
-         Thread.sleep(10000);
-         
-         log.info("Let's go....");
-         
-         pconn.start();
-         
-         
-         latch.await();
-         
-         end = System.currentTimeMillis();
-
-         actualRate = 1000 * (double)numMessages / ( end - start);
-
-         log.info("consume rate " + actualRate + " msgs /sec");
-
-      }
-      finally
-      {
-         pconn.close();
-      }
-   }
-   
    public void testTransactedSendPersistent() throws Exception
    {
       transactedSend(true);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-14 18:14:01 UTC (rev 4197)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-14 19:00:38 UTC (rev 4198)
@@ -57,35 +57,35 @@
       
       super.tearDown();
    }
-//   
-//   
-//   public void testCoreClient() throws Exception
-//   {
-//      Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
-//            
-//      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-//      ClientConnection conn = cf.createConnection();
-//      
-//      ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
-//      session.createQueue(QUEUE, QUEUE, null, false, false);
-//      
-//      ClientProducer producer = session.createProducer(QUEUE);
-//
-//      ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
-//            System.currentTimeMillis(), (byte) 1);
-//      message.getBody().putString("testINVMCoreClient");
-//      producer.send(message);
-//
-//      ClientConsumer consumer = session.createConsumer(QUEUE);
-//      conn.start();
-//      
-//      message = consumer.receive(1000);
-//      
-//      assertEquals("testINVMCoreClient", message.getBody().getString());
-//      
-//      conn.close();
-//   }
    
+   
+   public void testCoreClient() throws Exception
+   {
+      Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
+            
+      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+      ClientConnection conn = cf.createConnection();
+      
+      ClientSession session = conn.createClientSession(false, true, true, -1, false, false);
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+      
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
+            System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString("testINVMCoreClient");
+      producer.send(message);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      conn.start();
+      
+      message = consumer.receive(1000);
+      
+      assertEquals("testINVMCoreClient", message.getBody().getString());
+      
+      conn.close();
+   }
+   
    public static void main(String[] args)
    {
       try




More information about the jboss-cvs-commits mailing list