[jboss-cvs] JBoss Messaging SVN: r4203 - in trunk: src/main/org/jboss/messaging/core/server and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 15 07:38:09 EDT 2008


Author: timfox
Date: 2008-05-15 07:38:09 -0400 (Thu, 15 May 2008)
New Revision: 4203

Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/util/TypedProperties.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
Log:
Performance improvements also correct TypedProperties size change


Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -23,6 +23,7 @@
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.mina.common.IoSession;
@@ -30,7 +31,6 @@
 import org.apache.mina.common.IoSessionDataStructureFactory;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteRequestQueue;
-import org.apache.mina.util.CircularQueue;
 
 /**
  * 
@@ -52,7 +52,7 @@
    public WriteRequestQueue getWriteRequestQueue(IoSession session)
          throws Exception
    {
-      return new DefaultWriteRequestQueue();
+      return new ConcurrentWriteRequestQueue();
    }
    
    
@@ -134,38 +134,9 @@
   }
    
    
-   private static class DefaultWriteRequestQueue implements WriteRequestQueue
-   {
-      private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
-      
-      public void dispose(IoSession session) {
-      }
-      
-      public void clear(IoSession session) {
-          q.clear();
-      }
-
-      public synchronized boolean isEmpty(IoSession session) {
-          return q.isEmpty();
-      }
-
-      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
-          q.offer(writeRequest);
-      }
-
-      public synchronized WriteRequest poll(IoSession session) {
-          return q.poll();
-      }
-      
-      @Override
-      public String toString() {
-          return q.toString();
-      }
-  }
-   
 //   private static class DefaultWriteRequestQueue implements WriteRequestQueue
 //   {
-//      private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
+//      private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
 //      
 //      public void dispose(IoSession session) {
 //      }
@@ -191,5 +162,34 @@
 //          return q.toString();
 //      }
 //  }
+   
+   private static class ConcurrentWriteRequestQueue implements WriteRequestQueue
+   {
+      private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
+      
+      public void dispose(IoSession session) {
+      }
+      
+      public void clear(IoSession session) {
+          q.clear();
+      }
 
+      public synchronized boolean isEmpty(IoSession session) {
+          return q.isEmpty();
+      }
+
+      public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+          q.offer(writeRequest);
+      }
+
+      public synchronized WriteRequest poll(IoSession session) {
+          return q.poll();
+      }
+      
+      @Override
+      public String toString() {
+          return q.toString();
+      }
+  }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -141,7 +141,6 @@
             }
          }
       });
-
    }
 
    private final int high = 2000;
@@ -177,10 +176,6 @@
 
    public void acquireSemaphore() throws Exception
    {
-      // if (!sem.tryAcquire(5000, TimeUnit.MILLISECONDS))
-      // {
-      // throw new IllegalStateException("Timed out");
-      // }
       int newcount = count.incrementAndGet();
 
       if (newcount == high)
@@ -214,7 +209,17 @@
          {
             public void send(Packet p) throws Exception
             {
+               try
+               {
+                  acquireSemaphore();
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to acquire sem", e);
+               }
+               
                dispatcher.callFilters(p);
+
                session.write(p);
             }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -42,9 +42,9 @@
    
    MessageReference createReference(Queue queue);   
 
-   void decrementDurableRefCount();
+   int decrementDurableRefCount();
    
-   void incrementDurableRefCount();
+   int incrementDurableRefCount();
    
    int getDurableRefCount();
    

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -43,7 +43,6 @@
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
@@ -172,6 +171,8 @@
       deliver();
    }
  
+   //private volatile int count = 0;
+   
    /*
     * Attempt to deliver all the messages in the queue
     * 
@@ -219,7 +220,11 @@
          {
             if (iterator == null)
             {
-               messageReferences.removeFirst();
+//               count++;
+//               if (count == 500000)
+//               {
+                  messageReferences.removeFirst();
+            //   }
             }
             else
             {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -120,14 +120,14 @@
       return durableRefCount.get();
    }
    
-   public void decrementDurableRefCount()
+   public int decrementDurableRefCount()
    {
-      durableRefCount.decrementAndGet();
+      return durableRefCount.decrementAndGet();
    }
    
-   public void incrementDurableRefCount()
+   public int incrementDurableRefCount()
    {
-      durableRefCount.incrementAndGet();
+      return durableRefCount.incrementAndGet();
    }
      
    public ServerMessage copy()

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -24,11 +24,12 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -55,7 +56,6 @@
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.server.Delivery;
-import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConnection;
@@ -128,13 +128,15 @@
 
    private final Set<ServerProducer> producers = new ConcurrentHashSet<ServerProducer>();
 
-   private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
+   private final java.util.Queue<Delivery> deliveries = new ConcurrentLinkedQueue<Delivery>();
 
-   private long deliveryIDSequence = 0;
+   private final AtomicLong deliveryIDSequence = new AtomicLong(0);
 
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
    private Transaction tx;
+   
+   private final Object rollbackCancelLock = new Object();
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -223,12 +225,18 @@
       dispatcher.unregister(producer.getID());
    }
 
-   public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
+   public void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
-      Delivery delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), deliveryIDSequence++, sender);
-
-      deliveries.add(delivery);
-
+      Delivery delivery;
+      synchronized (rollbackCancelLock)
+      {
+         long nextID = deliveryIDSequence.getAndIncrement();
+         
+         delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), nextID, sender);
+         
+         deliveries.add(delivery);
+      }
+                 
       delivery.deliver();
    }
 
@@ -324,7 +332,7 @@
       }
    }
 
-   public synchronized void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
+   public void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
    {
    	/*
        Note that we do not consider it an error if the deliveries cannot be found to be acked.
@@ -421,7 +429,7 @@
       }
 
       // Synchronize to prevent any new deliveries arriving during this recovery. 
-      synchronized (this)
+      synchronized (rollbackCancelLock)
       {
          // Add any unacked deliveries into the tx. Doing this ensures all references are rolled back in the correct
          // order in a single contiguous block
@@ -433,7 +441,7 @@
         
          deliveries.clear();
          
-         deliveryIDSequence -= tx.getAcknowledgementsCount();
+         deliveryIDSequence.addAndGet(-tx.getAcknowledgementsCount());
       }
       
       tx.rollback(queueSettingsRepository);
@@ -449,7 +457,7 @@
 
          Transaction cancelTx;
 
-         synchronized (this)
+         synchronized (rollbackCancelLock)
          {
             cancelTx = new TransactionImpl(persistenceManager, postOffice);
 
@@ -1066,19 +1074,16 @@
 
 		if (message.isDurable() && queue.isDurable())
 		{
-			synchronized (message)
+			int count = message.decrementDurableRefCount();
+
+			if (count == 0)
 			{
-				message.decrementDurableRefCount();
-
-				if (message.getDurableRefCount() == 0)
-				{
-					persistenceManager.storeDelete(message.getMessageID());
-				}
-				else
-				{
-					persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
-				}
+				persistenceManager.storeDelete(message.getMessageID());
 			}
+			else
+			{
+				persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
+			}			
 		}
 
 		queue.referenceAcknowledged();

Modified: trunk/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/src/main/org/jboss/messaging/util/TypedProperties.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -45,7 +45,6 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.logging.Logger;
@@ -64,8 +63,9 @@
 	private static final Logger log = Logger.getLogger(TypedProperties.class);
 	
 	private Map<SimpleString, PropertyValue> properties;
-	AtomicInteger size = new AtomicInteger(0);
 	
+	private volatile int size;
+	
 	public TypedProperties()
 	{		
 	}
@@ -168,7 +168,7 @@
    	   int numHeaders = buffer.getInt();
    	 		
       	properties = new HashMap<SimpleString, PropertyValue>(numHeaders);
-      	size.set(0);
+      	size = 0;
    		
    		for (int i = 0; i < numHeaders; i++)
    		{
@@ -290,7 +290,7 @@
 	   }
 	   else
 	   {
-         return SIZE_BYTE + SIZE_INT + size.intValue();
+         return SIZE_BYTE + SIZE_INT + size;
          
 	   }
 	}
@@ -318,11 +318,11 @@
       PropertyValue oldValue = properties.put(key, value);
       if (oldValue != null)
       {
-         size.addAndGet(value.encodeSize() - oldValue.encodeSize());
+         size += value.encodeSize() - oldValue.encodeSize();
       }
       else
       {
-         size.addAndGet(SimpleString.sizeofString(key) + value.encodeSize());
+         size += SimpleString.sizeofString(key) + value.encodeSize();
       }
    }
    
@@ -334,15 +334,15 @@
 		}
 		
 		PropertyValue val = properties.remove(key);
-		
-		size.addAndGet((SimpleString.sizeofString(key) + val.encodeSize()) * -1);
-		
+				
 		if (val == null)
 		{
          return null;
 		}
 		else
 		{
+		   size -= SimpleString.sizeofString(key) + val.encodeSize();
+	      
 			return val.getValue();
 		}
 	}

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -339,8 +339,6 @@
 
          assertTrue(fast.processed == numMessages - 2);
          
-        // Thread.sleep(10000);
-         
       }
       finally
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-14 23:29:46 UTC (rev 4202)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/CoreClientTest.java	2008-05-15 11:38:09 UTC (rev 4203)
@@ -86,179 +86,6 @@
       conn.close();
    }
    
-   public static void main(String[] args)
-   {
-      try
-      {
-         CoreClientTest test = new CoreClientTest();
-         
-         test.setUp();
-         test.testCoreClientPerf();
-         test.tearDown();
-      }
-      catch (Throwable t)
-      {
-         t.printStackTrace();
-      }
-   }
-   
-   public void testCoreClientPerf() throws Exception
-   {
-      Location location = new LocationImpl(TransportType.TCP, "localhost", ConfigurationImpl.DEFAULT_REMOTING_PORT);
-            
-      ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-      cf.setDefaultConsumerWindowSize(-1);
-   //   cf.setDefaultProducerMaxRate(30000);
-      
-      ClientConnection conn = cf.createConnection();
-      
-      final ClientSession session = conn.createClientSession(false, true, true, 1000, false, false);
-      session.createQueue(QUEUE, QUEUE, null, false, false);
-      
-      ClientProducer producer = session.createProducer(QUEUE);
-
-      ClientMessage message = new ClientMessageImpl(JBossTextMessage.TYPE, false, 0,
-            System.currentTimeMillis(), (byte) 1);
-      
-      byte[] bytes = new byte[1000];
-      
-      message.getBody().putBytes(bytes);
-      
-      message.getBody().flip();
-      
-      
-      ClientConsumer consumer = session.createConsumer(QUEUE);
-            
-      final CountDownLatch latch = new CountDownLatch(1);
-//      
-      final int numMessages = 100000000;
-      
-      class MyHandler implements MessageHandler
-      {
-         int count;
-
-         public void onMessage(ClientMessage msg)
-         {
-            count++;
-            
-            try
-            {
-               session.acknowledge();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-
-            if (count == numMessages)
-            {
-               latch.countDown();
-            }                        
-         }            
-      }
-
-      consumer.setMessageHandler(new MyHandler());
-      
-      
-            
-      //System.out.println("Waiting 10 secs");
-      
-     // Thread.sleep(10000);
-      
-      
-      
-      System.out.println("Starting");
-      
-      
-      //Warmup
-      for (int i = 0; i < 50000; i++)
-      {      
-         producer.send(message);
-      }
-//      
-//      System.out.println("Waiting 10 secs");
-//      
-//      Thread.sleep(10000);
-      
-      
-      long start = System.currentTimeMillis();
-            
-      for (int i = 0; i < numMessages; i++)
-      {      
-         producer.send(message);
-      }
-      
-      
-      
-            
-      //long end = System.currentTimeMillis();
-      
-      //double actualRate = 1000 * (double)numMessages / ( end - start);
-      
-      //System.out.println("Send Rate is " + actualRate);
-      
-//      long end = System.currentTimeMillis();
-//      
-//      double actualRate = 1000 * (double)numMessages / ( end - start);
-      
-   //   System.out.println("Rate is " + actualRate);
-      
-   //   conn.start();
-      
-     // start = System.currentTimeMillis();
-      
- //     latch.await();
-      
-//      long end = System.currentTimeMillis();
-//
-//      double actualRate = 1000 * (double)numMessages / ( end - start);
-//                  
-//      System.out.println("Rate is " + actualRate);
-
-      //conn.start();
-      
-      //System.out.println("Waiting 10 secs");
-      
-      
-      
-      long end = System.currentTimeMillis();
-      
-      double actualRate = 1000 * (double)numMessages / ( end - start);
-      
-      System.out.println("Rate is " + actualRate);
-      
-      //Thread.sleep(10000);
-      
-      
-      //      conn.start();
-//      
-//      
-//      start = System.currentTimeMillis();
-//
-      
-//      conn.start();
-////      
-//      start = System.currentTimeMillis();
-////      
-//      latch.await();
-////            
-////      
-//      end = System.currentTimeMillis();
-//      
-//      actualRate = 1000 * (double)numMessages / ( end - start);
-//      
-//      System.out.println("Rate is " + actualRate);
-      
-//      
-//      message = consumer.receive(1000);
-//      
-//      assertEquals("testINVMCoreClient", message.getBody().getString());
-//      
-      conn.close();
-   }
-
-
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list