[jboss-cvs] JBoss Messaging SVN: r5714 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 23 18:48:02 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-23 18:48:02 -0500 (Fri, 23 Jan 2009)
New Revision: 5714

Modified:
   trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1479 - re-enabling/fix drop messages

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2009-01-23 23:48:02 UTC (rev 5714)
@@ -80,5 +80,5 @@
     * @return
     * @throws Exception 
     */
-   boolean addSize(long memoryEstimate) throws Exception;
+   void addSize(long memoryEstimate) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-23 23:48:02 UTC (rev 5714)
@@ -204,7 +204,14 @@
       currentPageLock.readLock().lock();
       try
       {
-         return currentPage != null;
+         if (isDropWhenMaxSize())
+         {
+            return isDrop();
+         }
+         else
+         {
+            return currentPage != null;
+         }
       }
       finally
       {
@@ -221,35 +228,19 @@
    {
       return storeName;
    }
-
-   public boolean addSize(final long size) throws Exception
+   
+   public void addSize(final long size) throws Exception
    {
       final long maxSize = getMaxSizeBytes();
 
       final long pageSize = getPageSizeBytes();
 
-      if (isDropWhenMaxSize() && size > 0)
+      if (isDropWhenMaxSize())
       {
-         // if destination configured to drop messages && size is over the
-         // limit, we return -1 which means drop the message
-         if (getAddressSize() + size > maxSize || pagingManager.getMaxGlobalSize() > 0 &&
-             pagingManager.getGlobalSize() + size > pagingManager.getMaxGlobalSize())
-         {
-            if (!printedDropMessagesWarning)
-            {
-               printedDropMessagesWarning = true;
+         addAddressSize(size);
+         pagingManager.addGlobalSize(size);
 
-               log.warn("Messages are being dropped on adress " + getStoreName());
-            }
-
-            return false;
-         }
-         else
-         {
-            addAddressSize(size);
-
-            return true;
-         }
+         return;
       }
       else
       {
@@ -303,7 +294,8 @@
                          (maxGlobalSize - pagingManager.getDefaultPageSize()));
             }
 
-            if (maxGlobalSize > 0 && pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
+            if (maxGlobalSize > 0 && pagingManager.isGlobalPageMode() &&
+                currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
             {
                pagingManager.startGlobalDepage();
             }
@@ -316,7 +308,7 @@
             }
          }
 
-         return true;
+         return;
       }
    }
 
@@ -328,10 +320,24 @@
          throw new IllegalStateException("PagingStore(" + getStoreName() + ") not initialized");
       }
 
-      // We should never page when drop-messages is activated.
       if (dropMessagesWhenFull)
       {
-         return false;
+         if (isDrop())
+         {
+            if (!printedDropMessagesWarning)
+            {
+               printedDropMessagesWarning = true;
+
+               log.warn("Messages are being dropped on adress " + getStoreName());
+            }
+
+            // Address is full, we just pretend we are paging, and drop the data
+            return true;
+         }
+         else
+         {
+            return false;
+         }
       }
 
       // We need to ensure a read lock, as depage could change the paging state
@@ -455,7 +461,7 @@
       {
          return false;
       }
-      
+
       if (pagingManager.isBackup())
       {
          return false;
@@ -820,29 +826,29 @@
       // back to where it was
 
       Transaction depageTransaction = new TransactionImpl(storageManager);
-      
+
       SendLock sendLock = postOffice.getAddressLock(destination);
-      
+
       sendLock.beforeSend();
-      
+
       try
       {
          depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
-   
+
          HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-         
+
          for (PagedMessage pagedMessage : pagedMessages)
          {
             ServerMessage message = null;
-   
+
             message = pagedMessage.getMessage(storageManager);
-   
+
             final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-   
+
             if (transactionIdDuringPaging >= 0)
             {
                final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
-   
+
                // http://wiki.jboss.org/wiki/JBossMessaging2Paging
                // This is the Step D described on the "Transactions on Paging"
                // section
@@ -851,28 +857,29 @@
                   log.warn("Transaction " + pagedMessage.getTransactionID() +
                            " used during paging not found, ignoring message " +
                            message);
-   
+
                   continue;
                }
-   
+
                // This is to avoid a race condition where messages are depaged
                // before the commit arrived
-               
+
                while (running && !pageTransactionInfo.waitCompletion(500))
                {
                   // This is just to give us a chance to interrupt the process..
-                  // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying the shutdown of the server
+                  // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
+                  // the shutdown of the server
                   if (isTrace)
                   {
                      trace("Waiting pageTransaction to complete");
                   }
                }
-               
+
                if (!running)
                {
                   break;
                }
-               
+
                if (!pageTransactionInfo.isCommit())
                {
                   if (isTrace)
@@ -881,7 +888,7 @@
                   }
                   continue;
                }
-   
+
                // Update information about transactions
                if (message.isDurable())
                {
@@ -889,21 +896,21 @@
                   pageTransactionsToUpdate.add(pageTransactionInfo);
                }
             }
-   
+
             postOffice.route(message, depageTransaction);
          }
-         
+
          if (!running)
          {
             depageTransaction.rollback();
             return false;
          }
-   
+
          for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
          {
             // This will set the journal transaction to commit;
             depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-   
+
             if (pageWithTransaction.getNumberOfMessages() == 0)
             {
                // http://wiki.jboss.org/wiki/JBossMessaging2Paging
@@ -916,9 +923,9 @@
                storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
             }
          }
-   
+
          depageTransaction.commit();
-   
+
          if (isTrace)
          {
             trace("Depage committed, running = " + running);
@@ -928,7 +935,7 @@
       {
          sendLock.afterSend();
       }
-      
+
       return true;
    }
 
@@ -1027,6 +1034,14 @@
       return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
    }
 
+   // To be used on isDropMessagesWhenFull
+   private boolean isDrop()
+   {
+      return (getMaxSizeBytes() > 0 && getAddressSize() > getMaxSizeBytes()) ||
+             (pagingManager.getMaxGlobalSize() > 0 && pagingManager.getGlobalSize() > pagingManager.getMaxGlobalSize());
+   }
+
+
    // Inner classes -------------------------------------------------
 
    private class DepageRunnable implements Runnable
@@ -1050,7 +1065,8 @@
                }
 
                // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
-               // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding the lock and this would dead lock
+               // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
+               // the lock and this would dead lock
                if (running && !clearDepage())
                {
                   followingExecutor.execute(this);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-23 23:48:02 UTC (rev 5714)
@@ -107,48 +107,6 @@
    }
 
 
-   public void testPagingManagerAddressFull() throws Exception
-   {
-      HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
-      queueSettings.setDefault(new QueueSettings());
-
-      QueueSettings simpleTestSettings = new QueueSettings();
-      simpleTestSettings.setDropMessagesWhenFull(true);
-      simpleTestSettings.setMaxSizeBytes(200);
-
-      queueSettings.addMatch("simple-test", simpleTestSettings);
-
-      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getJournalDir(), 10),
-                                                            null,
-                                                            queueSettings,
-                                                            -1,
-                                                            1024 * 1024,
-                                                            false,
-                                                            false);
-      managerImpl.start();
-
-      managerImpl.createPageStore(new SimpleString("simple-test"));
-
-      ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
-
-      assertTrue(managerImpl.getPageStore(msg.getDestination()).addSize(msg.getMemoryEstimate()));
-
-      for (int i = 0; i < 10; i++)
-      {
-         long currentSize = managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize();
-         assertFalse(managerImpl.getPageStore(msg.getDestination()).addSize(msg.getMemoryEstimate()));
-
-         // should be unchanged
-         assertEquals(currentSize, managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize());
-      }
-
-      managerImpl.getPageStore(msg.getDestination()).addSize(-msg.getMemoryEstimate());
-
-      assertTrue(managerImpl.getPageStore(msg.getDestination()).addSize(msg.getMemoryEstimate()));
-
-      managerImpl.stop();
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java	2009-01-23 23:48:02 UTC (rev 5714)
@@ -74,6 +74,7 @@
    {
       super.tearDown();
    }
+
    public void testSendReceivePaging() throws Exception
    {
       clearData();
@@ -637,18 +638,197 @@
       }
 
    }
-   
+
    public void testPageMultipleDestinations() throws Exception
    {
       internalTestPageMultipleDestinations(false);
    }
 
-   
    public void testPageMultipleDestinationsTransacted() throws Exception
    {
       internalTestPageMultipleDestinations(true);
    }
-   
+
+   public void testDropMessagesQueueMax() throws Exception
+   {
+      testDropMessages(false);
+   }
+
+   public void testDropMessagesGlobalMax() throws Exception
+   {
+      testDropMessages(true);
+   }
+
+   private void testDropMessages(boolean global) throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      if (global)
+      {
+         config.setPagingMaxGlobalSizeBytes(10 * 1024);
+      }
+      else
+      {
+         config.setPagingMaxGlobalSizeBytes(-1);
+      }
+
+      config.setPagingDefaultSize(10 * 1024);
+
+      HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+
+      QueueSettings set = new QueueSettings();
+      set.setDropMessagesWhenFull(true);
+      if (!global)
+      {
+         set.setMaxSizeBytes(10 * 1024);
+      }
+      else
+      {
+         set.setMaxSizeBytes(-1);
+         
+      }
+      settings.put(ADDRESS.toString(), set);
+
+      MessagingService messagingService = createService(true, config, settings);
+
+      messagingService.start();
+
+      final int sizeOfMessage = 1024;
+      final int numberOfMessages = 1000;
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonPersistentSend(true);
+         sf.setBlockOnPersistentSend(true);
+         sf.setBlockOnAcknowledge(true);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ByteBuffer ioBuffer = ByteBuffer.allocate(sizeOfMessage);
+
+         ClientMessage message = null;
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+
+            producer.send(message);
+         }
+
+         session = sf.createSession(null, null, false, true, true, false, 0);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < 9; i++)
+         {
+            ClientMessage message2 = consumer.receive(10000);
+
+            assertNotNull(message2);
+
+            message2.acknowledge();
+         }
+
+         assertNull(consumer.receive(100));
+
+         assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+         assertEquals(0, messagingService.getServer()
+                                         .getPostOffice()
+                                         .getPagingManager()
+                                         .getPageStore(ADDRESS)
+                                         .getAddressSize());
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+
+            producer.send(message);
+         }
+
+         for (int i = 0; i < 9; i++)
+         {
+            ClientMessage message2 = consumer.receive(10000);
+
+            assertNotNull(message2);
+
+            message2.acknowledge();
+         }
+
+         assertNull(consumer.receive(100));
+
+         session.close();
+
+         session = sf.createSession(false, true, true);
+
+         producer = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+            message = session.createClientMessage(true);
+            message.setBody(bodyLocal);
+
+            producer.send(message);
+         }
+
+         session.commit();
+
+         consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < 9; i++)
+         {
+            ClientMessage message2 = consumer.receive(10000);
+
+            assertNotNull(message2);
+
+            message2.acknowledge();
+         }
+
+         session.commit();
+
+         assertNull(consumer.receive(100));
+
+         session.close();
+
+         assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+         assertEquals(0, messagingService.getServer()
+                                         .getPostOffice()
+                                         .getPagingManager()
+                                         .getPageStore(ADDRESS)
+                                         .getAddressSize());
+
+      }
+      finally
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    private void internalTestPageMultipleDestinations(boolean transacted) throws Exception
    {
       Configuration config = createDefaultConfig();
@@ -695,8 +875,8 @@
          for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
          {
             producer.send(message);
-            
-            if (transacted) 
+
+            if (transacted)
             {
                session.commit();
             }




More information about the jboss-cvs-commits mailing list