[jboss-cvs] JBoss Messaging SVN: r5214 - in branches/Branch_Chunk_CRS2: tests/jms-tests/src/org/jboss/test/messaging/jms and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 29 18:51:42 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-29 18:51:42 -0400 (Wed, 29 Oct 2008)
New Revision: 5214

Modified:
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
Log:
Optimization (readAhead) & tweaks on tests

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-29 22:51:42 UTC (rev 5214)
@@ -26,7 +26,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -115,6 +114,10 @@
    /** The current position on the message being processed */
    private volatile long positionLargeMessage;
    
+   private volatile long sizeLargeMessage;
+   
+   private SessionSendChunkMessage readAheadChunk = null;
+   
 
    private final Channel channel;
 
@@ -234,6 +237,8 @@
                // Not replicated - just send now
                pendingLargeMessage = (ServerLargeMessage)message;
                positionLargeMessage = 0;
+               // it is better to cache this, as this usually means an operation on the file system
+               sizeLargeMessage = pendingLargeMessage.getBodySize();
                sendChunks();
 //            }
 //            else
@@ -485,78 +490,40 @@
          {
             return true;
          }
-         final long bodySize = pendingLargeMessage.getBodySize();
-   
-         int chunkLength = 0;
-   
-         SessionSendChunkMessage chunk = null;
-   
-         for (; positionLargeMessage < bodySize; positionLargeMessage += chunkLength)
+
+         if (readAheadChunk != null)
          {
+            positionLargeMessage += readAheadChunk.getBody().length;
+            channel.send(readAheadChunk);
+            readAheadChunk = null;
             
+         }
+         
+         while (positionLargeMessage < sizeLargeMessage)
+         {
+            
             if (availableCredits.get() <= 0)
             {
-               return false;
-            }
-   
-            if (positionLargeMessage == 0)
-            {
-               int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
-   
-               chunkLength = minLargeMessageSize - headerSize;
-   
-               MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
-               pendingLargeMessage.encodeProperties(headerBuffer);
-   
-               MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
-               pendingLargeMessage.encodeBody(bodyBuffer, 0, chunkLength);
-   
-               if (availableCredits != null)
+               
+               if (readAheadChunk == null)
                {
-                  availableCredits.addAndGet(-chunkLength);
+                  readAheadChunk = createChunkSend();
                }
-   
-               chunk = new SessionSendChunkMessage(id,
-                                                   headerBuffer.array(),
-                                                   bodyBuffer.array(),
-                                                   chunkLength < bodySize,
-                                                   false);
+               return false;
             }
-            else
+            
+            SessionSendChunkMessage chunk = createChunkSend();
+            
+            int chunkLen = chunk.getBody().length;
+
+            if (availableCredits != null)
             {
-               chunkLength = (int)Math.min(bodySize - positionLargeMessage, minLargeMessageSize);
-   
-               if (availableCredits != null)
-               {
-                  int leftCredits = availableCredits.addAndGet(-chunkLength);
-//                  if (leftCredits < 0)
-//                  {
-//                     if (chunkLength > 0)
-//                     {
-//                        availableCredits.addAndGet(-leftCredits);
-//                     }
-//                     else
-//                     {
-//                        // sanity check only.. it shouldn't happen
-//                        // This next statement means, we didn't have enough credit to send anything, so we return the credits and give up sending
-//                        availableCredits.addAndGet(chunkLength);
-//                        return false;
-//                     }
-//                  }
-               }
-   
-               MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
-   
-               pendingLargeMessage.encodeBody(bodyBuffer, positionLargeMessage, chunkLength);
-   
-               chunk = new SessionSendChunkMessage(id,
-                                                   null,
-                                                   bodyBuffer.array(),
-                                                   positionLargeMessage + chunkLength < bodySize,
-                                                   false);
+               availableCredits.addAndGet(-chunkLen);
             }
    
             channel.send(chunk);
+            
+            positionLargeMessage += chunkLen;
          }
    
          pendingLargeMessage.releaseResources();
@@ -574,6 +541,51 @@
 
    }
    
+   
+   private SessionSendChunkMessage createChunkSend()
+   {
+      SessionSendChunkMessage chunk;
+      
+      int localChunkLen = 0;
+      
+      if (positionLargeMessage == 0)
+      {
+         int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
+
+         localChunkLen = minLargeMessageSize - headerSize;
+
+         MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
+         pendingLargeMessage.encodeProperties(headerBuffer);
+
+         MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
+         pendingLargeMessage.encodeBody(bodyBuffer, 0, localChunkLen);
+
+
+         chunk = new SessionSendChunkMessage(id,
+                                             headerBuffer.array(),
+                                             bodyBuffer.array(),
+                                             localChunkLen < sizeLargeMessage,
+                                             false);
+      }
+      else
+      {
+         localChunkLen = (int)Math.min(sizeLargeMessage - positionLargeMessage, minLargeMessageSize);
+
+         MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
+
+         pendingLargeMessage.encodeBody(bodyBuffer, positionLargeMessage, localChunkLen);
+
+         chunk = new SessionSendChunkMessage(id,
+                                             null,
+                                             bodyBuffer.array(),
+                                             positionLargeMessage + localChunkLen < sizeLargeMessage,
+                                             false);
+      }
+      
+      return chunk;
+      
+   }
+   
    private void doClose() throws Exception
    {
       messageQueue.removeConsumer(this);

Modified: branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java	2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/jms-tests/src/org/jboss/test/messaging/jms/String64KLimitTest.java	2008-10-29 22:51:42 UTC (rev 5214)
@@ -112,19 +112,19 @@
          
          prod.send(tm4);
    
-         TextMessage rm1 = (TextMessage)cons.receive(60000);
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
          
          assertNotNull(rm1);           
          
-         TextMessage rm2 = (TextMessage)cons.receive(60000);
+         TextMessage rm2 = (TextMessage)cons.receive(1000);
          
          assertNotNull(rm2);
          
-         TextMessage rm3 = (TextMessage)cons.receive(60000);
+         TextMessage rm3 = (TextMessage)cons.receive(1000);
          
          assertNotNull(rm3);
          
-         TextMessage rm4 = (TextMessage)cons.receive(60000);
+         TextMessage rm4 = (TextMessage)cons.receive(1000);
          
          assertNotNull(rm4);
          

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-10-29 22:51:42 UTC (rev 5214)
@@ -87,7 +87,26 @@
                              final int waitOnConsumer,
                              final long delayDelivery) throws Exception
    {
+      testChunks(realFiles,
+                 useFile,
+                 numberOfMessages,
+                 numberOfIntegers,
+                 sendingBlocking,
+                 waitOnConsumer,
+                 delayDelivery,
+                 false);
+   }
 
+   protected void testChunks(final boolean realFiles,
+                             final boolean useFile,
+                             final int numberOfMessages,
+                             final int numberOfIntegers,
+                             final boolean sendingBlocking,
+                             final int waitOnConsumer,
+                             final long delayDelivery,
+                             final boolean testTime) throws Exception
+   {
+
       clearData();
 
       messagingService = createService(realFiles);
@@ -130,7 +149,10 @@
                   producer.send(message);
                }
 
-               System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+               if (testTime)
+               {
+                  System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+               }
             }
          }
          else
@@ -140,6 +162,7 @@
                ClientMessage message = session.createClientMessage(true);
                message.putIntProperty(new SimpleString("counter-message"), i);
                message.setBody(createLargeBuffer(numberOfIntegers));
+               long timeStart = System.currentTimeMillis();
                if (delayDelivery > 0)
                {
                   message.putLongProperty(new SimpleString("original-time"), System.currentTimeMillis());
@@ -149,6 +172,10 @@
                {
                   producer.send(message);
                }
+               if (testTime)
+               {
+                  System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+               }
             }
          }
 
@@ -178,7 +205,6 @@
          }
 
          session.start();
-         
 
          for (int i = 0; i < numberOfMessages; i++)
          {
@@ -187,8 +213,11 @@
             ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
 
             assertNotNull(message);
-            
-            System.out.println("Message received in " + (System.currentTimeMillis() - start));
+
+            if (testTime)
+            {
+               System.out.println("Message received in " + (System.currentTimeMillis() - start));
+            }
             start = System.currentTimeMillis();
 
             if (delayDelivery > 0)
@@ -202,26 +231,27 @@
 
             assertNotNull(message);
 
-            System.out.println("msg on client = " + message.getMessageID());
-
             if (delayDelivery <= 0)
             { // right now there is no guarantee of ordered delivered on multiple scheduledMessages
                assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
             }
 
-            if (message instanceof FileClientMessage)
+            if (!testTime)
             {
-               checkFileRead(((FileClientMessage)message).getFile(), numberOfIntegers);
-            }
-            else
-            {
-               MessagingBuffer buffer = message.getBody();
-               buffer.rewind();
-               assertEquals(numberOfIntegers * DataConstants.SIZE_INT, buffer.limit());
-               for (int b = 0; b < numberOfIntegers; b++)
+               if (message instanceof FileClientMessage)
                {
-                  assertEquals(b, buffer.getInt());
+                  checkFileRead(((FileClientMessage)message).getFile(), numberOfIntegers);
                }
+               else
+               {
+                  MessagingBuffer buffer = message.getBody();
+                  buffer.rewind();
+                  assertEquals(numberOfIntegers * DataConstants.SIZE_INT, buffer.limit());
+                  for (int b = 0; b < numberOfIntegers; b++)
+                  {
+                     assertEquals(b, buffer.getInt());
+                  }
+               }
             }
          }
 

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-29 22:51:42 UTC (rev 5214)
@@ -460,8 +460,6 @@
 
             message2.acknowledge();
 
-            System.out.println("msg on client = " + message2.getMessageID());
-
             assertNotNull(message2);
 
             try

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2008-10-29 22:13:35 UTC (rev 5213)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java	2008-10-29 22:51:42 UTC (rev 5214)
@@ -48,7 +48,7 @@
 
    public void testMessageChunkFilePersistence1G() throws Exception
    {
-      testChunks(true, true, 2, 268435456, false, 120000, 0);
+      testChunks(true, true, 2, 268435456, false, 120000, 0, true);
    }
 
    // Package protected ---------------------------------------------




More information about the jboss-cvs-commits mailing list