[hornetq-commits] JBoss hornetq SVN: r8586 - in trunk: src/main/org/hornetq/core/replication/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Dec 6 01:58:34 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-06 01:58:34 -0500 (Sun, 06 Dec 2009)
New Revision: 8586

Modified:
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
Log:
Fixed bug with duplicate detection on depage and transactional send

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-12-05 19:37:17 UTC (rev 8585)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-12-06 06:58:34 UTC (rev 8586)
@@ -49,6 +49,7 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.SimpleString;
 
@@ -282,7 +283,7 @@
    }
 
    public void addSize(final ServerMessage message, final boolean add)
-   {            
+   {
       long size = message.getMemoryEstimate();
 
       if (add)
@@ -302,7 +303,7 @@
    public void addSize(final MessageReference reference, final boolean add)
    {
       long size = MessageReferenceImpl.getMemoryEstimate();
-      
+
       if (add)
       {
          checkReleaseProducerFlowControlCredits(size);
@@ -400,9 +401,9 @@
       if (running)
       {
          running = false;
-         
+
          final CountDownLatch latch = new CountDownLatch(1);
-         
+
          executor.execute(new Runnable()
          {
             public void run()
@@ -410,10 +411,10 @@
                latch.countDown();
             }
          });
-         
+
          if (!latch.await(60, TimeUnit.SECONDS))
          {
-            log.warn("Timed out on waiting PagingStore "  + this.address + " to shutdown");
+            log.warn("Timed out on waiting PagingStore " + this.address + " to shutdown");
          }
 
          if (currentPage != null)
@@ -726,7 +727,7 @@
       }
    }
 
-   private void addSize(final long size) 
+   private void addSize(final long size)
    {
       if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
       {
@@ -948,55 +949,67 @@
 
          final long transactionIdDuringPaging = pagedMessage.getTransactionID();
 
+         postOffice.route(message, depageTransaction);
+
+         // This means the page is duplicated. So we need to ignore this
+         if (depageTransaction.getState() == State.ROLLBACK_ONLY)
+         {
+            break;
+         }
+
+         PageTransactionInfo pageUserTransaction = null;
+
          if (transactionIdDuringPaging >= 0)
          {
-            final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
+            pageUserTransaction = pagingManager.getTransaction(transactionIdDuringPaging);
 
-            if (pageTransactionInfo == null)
+            if (pageUserTransaction == null)
             {
-               log.warn("Transaction " + pagedMessage.getTransactionID() +
-                        " used during paging not found, ignoring message " +
-                        message);
+               // This is not supposed to happen
+               log.warn("Transaction " + pagedMessage.getTransactionID() + " used during paging not found");
                continue;
             }
+            else
+            {
 
-            // This is to avoid a race condition where messages are depaged
-            // before the commit arrived
+               // This is to avoid a race condition where messages are depaged
+               // before the commit arrived
 
-            while (running && !pageTransactionInfo.waitCompletion(500))
-            {
-               // This is just to give us a chance to interrupt the process..
-               // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
-               // the shutdown of the server
-               if (isTrace)
+               while (running && !pageUserTransaction.waitCompletion(500))
                {
-                  trace("Waiting pageTransaction to complete");
+                  // This is just to give us a chance to interrupt the process..
+                  // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
+                  // the shutdown of the server
+                  if (isTrace)
+                  {
+                     trace("Waiting pageTransaction to complete");
+                  }
                }
-            }
 
-            if (!running)
-            {
-               break;
-            }
+               if (!running)
+               {
+                  break;
+               }
 
-            if (!pageTransactionInfo.isCommit())
-            {
-               if (isTrace)
+               if (!pageUserTransaction.isCommit())
                {
-                  trace("Rollback was called after prepare, ignoring message " + message);
+                  if (isTrace)
+                  {
+                     trace("Rollback was called after prepare, ignoring message " + message);
+                  }
+                  continue;
                }
-               continue;
             }
 
-            // Update information about transactions
-            if (message.isDurable())
-            {
-               pageTransactionInfo.decrement();
-               pageTransactionsToUpdate.add(pageTransactionInfo);
-            }
          }
 
-         postOffice.route(message, depageTransaction);
+         // Update information about transactions
+         // This needs to be done after routing because of duplication detection
+         if (pageUserTransaction != null && message.isDurable())
+         {
+            pageUserTransaction.decrement();
+            pageTransactionsToUpdate.add(pageUserTransaction);
+         }
       }
 
       if (!running)
@@ -1023,7 +1036,7 @@
       }
 
       depageTransaction.commit();
-      
+
       storageManager.waitOnOperations();
 
       if (isTrace)

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-12-05 19:37:17 UTC (rev 8585)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-12-06 06:58:34 UTC (rev 8586)
@@ -95,6 +95,9 @@
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
 
    private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
+   
+   // Used on tests, to simulate failures on delete pages
+   private boolean deletePages = true;
 
    // Constructors --------------------------------------------------
    public ReplicationEndpointImpl(final HornetQServer server)
@@ -285,6 +288,12 @@
       }
 
    }
+   
+   /** Used on tests only. To simulate missing page deletes*/
+   public void setDeletePages(final boolean deletePages)
+   {
+      this.deletePages = deletePages;
+   }
 
    /**
     * @param journalInformation
@@ -504,7 +513,10 @@
       {
          if (packet.isDelete())
          {
-            page.delete();
+            if (deletePages)
+            {
+               page.delete();
+            }
          }
          else
          {

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-12-05 19:37:17 UTC (rev 8585)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-12-06 06:58:34 UTC (rev 8586)
@@ -854,6 +854,12 @@
    // Public
    // ---------------------------------------------------------------------------------------
 
+   /** For tests only */
+   public ReplicationEndpoint getReplicationEndpoint()
+   {
+      return this.replicationEndpoint;
+   }
+   
    // Package protected
    // ----------------------------------------------------------------------------
 

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2009-12-05 19:37:17 UTC (rev 8585)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2009-12-06 06:58:34 UTC (rev 8586)
@@ -17,7 +17,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.hornetq.core.buffers.HornetQBuffers;
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientProducer;
@@ -29,7 +28,9 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.utils.SimpleString;
 
@@ -120,10 +121,20 @@
 
          session.commit();
 
+         ReplicationEndpointImpl endpoint = null;
+
          if (failBeforeConsume)
          {
             failSession(session, latch);
          }
+         else
+         {
+            endpoint = (ReplicationEndpointImpl)((HornetQServerImpl)server1Service).getReplicationEndpoint();
+            if (endpoint != null)
+            {
+               endpoint.setDeletePages(false);
+            }
+         }
 
          session.start();
 
@@ -145,6 +156,11 @@
 
          session.commit();
 
+         if (endpoint != null)
+         {
+            endpoint.setDeletePages(true);
+         }
+
          if (!failBeforeConsume)
          {
             failSession(session, latch);

Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2009-12-05 19:37:17 UTC (rev 8585)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2009-12-06 06:58:34 UTC (rev 8586)
@@ -69,18 +69,31 @@
 
    public void testCrashDuringDeleteFile() throws Exception
    {
-      pageAndFail();
+      doTestCrashDuringDeleteFile(false);
+   }
 
+   public void testCrashDuringDeleteFileTransacted() throws Exception
+   {
+      doTestCrashDuringDeleteFile(true);
+   }
+
+   public void doTestCrashDuringDeleteFile(final boolean transacted) throws Exception
+   {
+      pageAndFail(transacted);
+
       File pageDir = new File(getPageDir());
 
       File directories[] = pageDir.listFiles();
 
       assertEquals(1, directories.length);
 
-      // When depage happened, a new empty page was supposed to be opened, what will create 3 files
-      assertEquals("Missing a file, supposed to have address.txt, 1st page and 2nd page",
-                   3,
-                   directories[0].list().length);
+      if (!transacted)
+      {
+         // When depage happened, a new empty page was supposed to be opened, what will create 3 files
+         assertEquals("Missing a file, supposed to have address.txt, 1st page and 2nd page",
+                      3,
+                      directories[0].list().length);
+      }
 
       Configuration config = createDefaultConfig();
 
@@ -122,7 +135,7 @@
    /** This method will leave garbage on paging. 
     *  It will not delete page files as if the server crashed right after commit, 
     *  and before removing the file*/
-   private void pageAndFail() throws Exception
+   private void pageAndFail(final boolean transacted) throws Exception
    {
       clearData();
       Configuration config = createDefaultConfig();
@@ -142,7 +155,7 @@
          sf.setBlockOnPersistentSend(true);
          sf.setBlockOnAcknowledge(true);
 
-         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+         ClientSession session = sf.createSession(null, null, false, !transacted, !transacted, false, 0);
 
          session.createQueue(ADDRESS, ADDRESS, null, true);
 
@@ -150,7 +163,7 @@
 
          ClientMessage message = session.createClientMessage(true);
          message.getBodyBuffer().writeBytes(new byte[1024]);
-         
+
          PagingStore store = server.getPostOffice().getPagingManager().getPageStore(ADDRESS);
 
          int messages = 0;
@@ -158,6 +171,10 @@
          {
             producer.send(message);
             messages++;
+            if (transacted && messages % 100 == 0)
+            {
+               session.commit();
+            }
          }
 
          for (int i = 0; i < 2; i++)
@@ -166,6 +183,8 @@
             producer.send(message);
          }
 
+         session.commit();
+
          session.close();
 
          assertTrue(server.getPostOffice().getPagingManager().getTotalMemory() > 0);



More information about the hornetq-commits mailing list