[hornetq-commits] JBoss hornetq SVN: r10366 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 25 15:31:43 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-25 15:31:43 -0400 (Fri, 25 Mar 2011)
New Revision: 10366

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6170 - Redelivery counters infinite updates

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -37,9 +37,11 @@
    private final PagePosition position;
 
    private WeakReference<PagedMessage> message;
-   
+
    private Long deliveryTime = null;
-   
+
+   private int persistedCount;
+
    private final PageSubscription subscription;
 
    public ServerMessage getMessage()
@@ -50,10 +52,10 @@
    public synchronized PagedMessage getPagedMessage()
    {
       PagedMessage returnMessage = message != null ? message.get() : null;
-      
+
       // We only keep a few references on the Queue from paging...
       // Besides those references are SoftReferenced on page cache...
-      // So, this will unlikely be null, 
+      // So, this will unlikely be null,
       // unless the Queue has stalled for some time after paging
       if (returnMessage == null)
       {
@@ -69,7 +71,9 @@
       return position;
    }
 
-   public PagedReferenceImpl(final PagePosition position, final PagedMessage message, final PageSubscription subscription)
+   public PagedReferenceImpl(final PagePosition position,
+                             final PagedMessage message,
+                             final PageSubscription subscription)
    {
       this.position = position;
       this.message = new WeakReference<PagedMessage>(message);
@@ -80,6 +84,16 @@
    {
       return true;
    }
+   
+   public void setPersistedCount(int count)
+   {
+      this.persistedCount = count;
+   }
+   
+   public int getPersistedCount()
+   {
+      return persistedCount;
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -744,16 +744,22 @@
 
    public void updateDeliveryCount(final MessageReference ref) throws Exception
    {
-      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
-                                                                               ref.getDeliveryCount());
+      // no need to store if it's the same value
+      // otherwise the journal will get OME in case of lots of redeliveries
+      if (ref.getDeliveryCount() != ref.getPersistedCount())
+      {
+         ref.setPersistedCount(ref.getDeliveryCount());
+         DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
+                                                                                  ref.getDeliveryCount());
+   
+         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+                                           JournalStorageManager.UPDATE_DELIVERY_COUNT,
+                                           updateInfo,
+   
+                                           syncNonTransactional,
+                                           getContext(syncNonTransactional));
+      }
 
-      messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
-                                        JournalStorageManager.UPDATE_DELIVERY_COUNT,
-                                        updateInfo,
-
-                                        syncNonTransactional,
-                                        getContext(syncNonTransactional));
-
    }
 
    public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -47,6 +47,10 @@
    int getDeliveryCount();
 
    void setDeliveryCount(int deliveryCount);
+   
+   void setPersistedCount(int deliveryCount);
+   
+   int getPersistedCount();
 
    void incrementDeliveryCount();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -645,18 +645,14 @@
 
       }
       
-      OperationContext formerCtx = null;
       // We close all the exception in an attempt to let any pending IO to finish
       // to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
       // It may still be possible to have this scenario on a real failure (without the use of XA)
       // But at least we will do our best to avoid it on regular shutdowns
       for (ServerSession session : sessions.values())
       {
-         storageManager.setContext(session.getSessionContext());
          session.close(true);
       }
-      
-      storageManager.setContext(formerCtx);
 
       remotingService.stop();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -230,6 +230,16 @@
          ref.setScheduledDeliveryTime(scheduledDeliveryTime);
       }
       
+      public void setPersistedCount(int count)
+      {
+         ref.setPersistedCount(count);
+      }
+      
+      public int getPersistedCount()
+      {
+         return ref.getPersistedCount();
+      }
+      
       public boolean isPaged()
       {
          return false;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -36,6 +36,8 @@
 
    private volatile int deliveryCount;
 
+   private volatile int persistedCount;
+
    private volatile long scheduledDeliveryTime;
 
    private final ServerMessage message;
@@ -91,6 +93,23 @@
    }
 
    // MessageReference implementation -------------------------------
+
+   /**
+    * @return the persistedCount
+    */
+   public int getPersistedCount()
+   {
+      return persistedCount;
+   }
+
+   /**
+    * @param persistedCount the persistedCount to set
+    */
+   public void setPersistedCount(int persistedCount)
+   {
+      this.persistedCount = persistedCount;
+   }
+
    public MessageReference copy(final Queue queue)
    {
       return new MessageReferenceImpl(this, queue);
@@ -109,6 +128,7 @@
    public void setDeliveryCount(final int deliveryCount)
    {
       this.deliveryCount = deliveryCount;
+      this.persistedCount = deliveryCount;
    }
 
    public void incrementDeliveryCount()
@@ -145,7 +165,7 @@
    {
       queue.referenceHandled();
    }
-   
+
    public boolean isPaged()
    {
       return false;
@@ -167,7 +187,6 @@
       queue.acknowledge(tx, this);
    }
 
-
    // Public --------------------------------------------------------
 
    @Override
@@ -175,7 +194,9 @@
    {
       return "Reference[" + getMessage().getMessageID() +
              "]:" +
-             (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") + ":" + getMessage() ;
+             (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") +
+             ":" +
+             getMessage();
    }
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -953,24 +953,35 @@
 
    public void close(final boolean failed)
    {
-      storageManager.afterCompleteOperations(new IOAsyncTask()
+      OperationContext formerCtx = storageManager.getContext();
+      
+      try
       {
-         public void onError(int errorCode, String errorMessage)
-         {
-         }
+         storageManager.setContext(sessionContext);
 
-         public void done()
+         storageManager.afterCompleteOperations(new IOAsyncTask()
          {
-            try
+            public void onError(int errorCode, String errorMessage)
             {
-               doClose(failed);
             }
-            catch (Exception e)
+   
+            public void done()
             {
-               log.error("Failed to close session", e);
+               try
+               {
+                  doClose(failed);
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to close session", e);
+               }
             }
-         }
-      });
+         });
+      }
+      finally
+      {
+         storageManager.setContext(formerCtx);
+      }
    }
 
    public void closeConsumer(final long consumerID) throws Exception

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java	2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java	2011-03-25 19:31:43 UTC (rev 10366)
@@ -13,13 +13,27 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.tests.util.ServiceTestBase;
 
@@ -223,6 +237,111 @@
       session.close();
    }
 
+
+   public void testInfiniteDedeliveryMessageOnPersistent() throws Exception
+   {
+      internaltestInfiniteDedeliveryMessageOnPersistent(false);
+   }
+   
+   private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean strict) throws Exception
+   {
+      setUp(strict);
+      ClientSession session = factory.createSession(false, false, false);
+
+      RedeliveryConsumerTest.log.info("created");
+
+      ClientProducer prod = session.createProducer(ADDRESS);
+      prod.send(createTextMessage(session, "Hello"));
+      session.commit();
+      session.close();
+
+      
+      int expectedCount = 1;
+      for (int i = 0 ; i < 700; i++)
+      {
+         session = factory.createSession(false, false, false);
+         session.start();
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         assertEquals(expectedCount, msg.getDeliveryCount());
+
+         if (i % 100 == 0)
+         {
+            expectedCount++;
+            msg.acknowledge();
+            session.rollback();
+         }
+         session.close();
+      }
+
+      factory.close();
+      server.stop();
+      
+      setUp(false);
+      
+      for (int i = 0 ; i < 700; i++)
+      {
+         session = factory.createSession(false, false, false);
+         session.start();
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         assertEquals(expectedCount, msg.getDeliveryCount());
+         session.close();
+      }
+
+      server.stop();
+
+      
+      JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 
+                                            2, 
+                                            0,
+                                            0, 
+                                            new NIOSequentialFileFactory(server.getConfiguration().getJournalDirectory()),
+                                            "hornetq-data",
+                                            "hq",
+                                            1);
+      
+      
+      final AtomicInteger updates = new AtomicInteger();
+      
+      journal.start();
+      journal.load(new LoaderCallback()
+      {
+         
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
+         
+         public void updateRecord(RecordInfo info)
+         {
+            if (info.userRecordType == JournalStorageManager.UPDATE_DELIVERY_COUNT)
+            {
+               updates.incrementAndGet();
+            }
+         }
+         
+         public void deleteRecord(long id)
+         {
+         }
+         
+         public void addRecord(RecordInfo info)
+         {
+         }
+         
+         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+         {
+         }
+      });
+      
+      journal.stop();
+      
+      
+      assertEquals(7, updates.get());
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -244,7 +363,14 @@
       factory = locator.createSessionFactory();
 
       ClientSession session = factory.createSession(false, false, false);
-      session.createQueue(ADDRESS, ADDRESS, true);
+      try
+      {
+         session.createQueue(ADDRESS, ADDRESS, true);
+      }
+      catch (HornetQException expected)
+      {
+         // in case of restart
+      }
 
       session.close();
    }



More information about the hornetq-commits mailing list