[hornetq-commits] JBoss hornetq SVN: r10450 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/journal/impl and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Apr 4 17:19:27 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-04 17:19:27 -0400 (Mon, 04 Apr 2011)
New Revision: 10450

Added:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
JBPAPP-6220 - page syncs and transactions

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/Journal.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -75,6 +75,15 @@
 
    void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception;
 
+   /**
+    * @param txID
+    * @param sync
+    * @param callback
+    * @param useLineUp if appendCommitRecord should call a storeLineUp. This is because the caller may have already taken into account
+    * @throws Exception
+    */
+   void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception;
+
    /** 
     * 
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
@@ -106,6 +115,8 @@
     *  This is only useful if you're using the journal but not interested on the current data.
     *  Useful in situations where the journal is being replicated, copied... etc. */
    JournalLoadInformation loadInternalOnly() throws Exception;
+   
+   void lineUpContex(IOCompletion callback);
 
    JournalLoadInformation load(List<RecordInfo> committedRecords,
                                List<PreparedTransactionInfo> preparedTransactions,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -1289,7 +1289,7 @@
    {
       SyncIOCompletion syncCompletion = getSyncCallback(sync);
 
-      appendCommitRecord(txID, sync, syncCompletion);
+      appendCommitRecord(txID, sync, syncCompletion, true);
 
       if (syncCompletion != null)
       {
@@ -1297,6 +1297,20 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+    */
+   public void lineUpContex(IOCompletion callback)
+   {
+      callback.storeLineUp();
+   }
+
+   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+   {
+      appendCommitRecord(txID, sync, callback, true);
+   }
+
+   
    /**
     * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
     * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file. 
@@ -1314,7 +1328,7 @@
     *
     */
 
-   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception
    {
       if (state != JournalImpl.STATE_LOADED)
       {
@@ -1334,7 +1348,7 @@
 
          JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID, null);
 
-         if (callback != null)
+         if (callback != null && lineUpContext)
          {
             callback.storeLineUp();
          }

Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.utils.ExecutorFactory;
+
+/**
+ * A PrintPage
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PrintPages
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public static void main(final String arg[])
+   {
+      if (arg.length != 2)
+      {
+         System.err.println("Usage: PrintPages <page foler> <journal folder>");
+      }
+      try
+      {
+
+         Map<Long, Set<PagePosition>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+
+         ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+         final ExecutorService executor = Executors.newFixedThreadPool(10);
+         ExecutorFactory execfactory = new ExecutorFactory()
+         {
+
+            public Executor getExecutor()
+            {
+               return executor;
+            }
+         };
+         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false);
+         HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+         addressSettingsRepository.setDefault(new AddressSettings());
+         StorageManager sm = new NullStorageManager();
+         PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
+
+         manager.start();
+
+         SimpleString stores[] = manager.getStoreNames();
+
+         for (SimpleString store : stores)
+         {
+            System.out.println("####################################################################################################");
+            System.out.println("Exploring store " + store);
+            PagingStore pgStore = manager.getPageStore(store);
+            int pgid = (int)pgStore.getFirstPage();
+            for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
+            {
+               System.out.println("*******   Page " + pgid);
+               Page page = pgStore.createPage(pgid);
+               page.open();
+               List<PagedMessage> msgs = page.read();
+               page.close();
+
+               int msgID = 0;
+
+               for (PagedMessage msg : msgs)
+               {
+                  msg.initMessage(sm);
+                  System.out.print("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+                  System.out.print(",Queues = ");
+                  long q[] = msg.getQueueIDs();
+                  for (int i = 0; i < q.length; i++)
+                  {
+                     System.out.print(q[i]);
+
+                     PagePosition posCheck = new PagePositionImpl(pgid, msgID);
+
+                     boolean acked = false;
+
+                     Set<PagePosition> positions = cursorACKs.get(q[i]);
+                     if (positions != null)
+                     {
+                        acked = positions.contains(posCheck);
+                     }
+
+                     if (acked)
+                     {
+                        System.out.print(" (ACK)");
+                     }
+
+                     if (i + 1 < q.length)
+                     {
+                        System.out.print(",");
+                     }
+                  }
+                  System.out.println();
+                  msgID++;
+               }
+
+               pgid++;
+
+            }
+         }
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   /**
+    * @param journalLocation
+    * @return
+    * @throws Exception
+    */
+   protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String journalLocation) throws Exception
+   {
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+
+      // Will use only default values. The load function should adapt to anything different
+      ConfigurationImpl defaultValues = new ConfigurationImpl();
+
+      JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
+                                                    defaultValues.getJournalMinFiles(),
+                                                    0,
+                                                    0,
+                                                    messagesFF,
+                                                    "hornetq-data",
+                                                    "hq",
+                                                    1);
+
+      messagesJournal.start();
+
+      ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+      ArrayList<PreparedTransactionInfo> txs = new ArrayList<PreparedTransactionInfo>();
+
+      messagesJournal.load(records, txs, null);
+
+      Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+      for (RecordInfo record : records)
+      {
+         if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+         {
+            byte[] data = record.data;
+
+            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+            CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+            encoding.decode(buff);
+
+            Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+            if (set == null)
+            {
+               set = new HashSet<PagePosition>();
+               cursorRecords.put(encoding.queueID, set);
+            }
+
+            set.add(encoding.position);
+         }
+      }
+      return cursorRecords;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -885,23 +885,14 @@
  
          currentPage.write(pagedMessage);
 
-         if (tx != null)
+         if (sync || tx != null)
          {
-            SyncPageStoreTX syncPage = (SyncPageStoreTX)tx.getProperty(TransactionPropertyIndexes.PAGE_SYNC);
-            if (syncPage == null)
-            {
-               syncPage = new SyncPageStoreTX();
-               tx.putProperty(TransactionPropertyIndexes.PAGE_SYNC, syncPage);
-               tx.addOperation(syncPage);
-            }
-            syncPage.addStore(this);
+            sync();
          }
-         else
+         
+         if (tx != null)
          {
-            if (sync)
-            {
-               sync();
-            }
+            tx.setWaitBeforeCommit(true);
          }
 
          return true;
@@ -957,63 +948,6 @@
       }
    }
 
-   private static class SyncPageStoreTX extends TransactionOperationAbstract
-   {
-      Set<PagingStore> storesToSync = new HashSet<PagingStore>();
-
-      public void addStore(PagingStore store)
-      {
-         storesToSync.add(store);
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
-       */
-      public void beforePrepare(Transaction tx) throws Exception
-      {
-         sync();
-      }
-
-      void sync() throws Exception
-      {
-         OperationContext originalTX = OperationContextImpl.getContext();
-
-         try
-         {
-            // We only want to sync paging here, no need to wait for any other events
-            OperationContextImpl.clearContext();
-
-            for (PagingStore store : storesToSync)
-            {
-               store.sync();
-            }
-
-            // We can't perform a commit/sync on the journal before we can assure page files are synced or we may get
-            // out of sync
-            OperationContext ctx = OperationContextImpl.getContext();
-
-            if (ctx != null)
-            {
-               // if null it means there were no operations done before, hence no need to wait any completions
-               ctx.waitCompletion();
-            }
-         }
-         finally
-         {
-            OperationContextImpl.setContext(originalTX);
-         }
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
-       */
-      public void beforeCommit(Transaction tx) throws Exception
-      {
-         sync();
-      }
-   }
-
    private class FinishPageMessageOperation implements TransactionOperation
    {
       private final PageTransactionInfo pageTransaction;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -54,6 +54,8 @@
 
    /** Get the context associated with the thread for later reuse */
    OperationContext getContext();
+   
+   void lineUpContext();
 
    /** It just creates an OperationContext without associating it */
    OperationContext newContext(Executor executor);
@@ -146,6 +148,8 @@
 
    void commit(long txID) throws Exception;
 
+   void commit(long txID, boolean lineUpContext) throws Exception;
+
    void rollback(long txID) throws Exception;
 
    void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -707,9 +707,19 @@
 
    public void commit(final long txID) throws Exception
    {
-      messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional));
+      commit(txID, true);
    }
 
+   public void commit(final long txID, final boolean lineUpContext) throws Exception
+   {
+      messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
+      if (!lineUpContext && !syncTransactional)
+      {
+         // if lineUpContext == false, we have previously lined up a context, hence we need to mark it as done even if syncTransactional = false
+         getContext(true).done();
+      }
+   }
+
    public void rollback(final long txID) throws Exception
    {
       messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional));
@@ -1420,7 +1430,17 @@
 
       return bindingsInfo;
    }
+   
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+    */
+   public void lineUpContext()
+   {
+      messageJournal.lineUpContex(getContext());
+   }
+
+
    // HornetQComponent implementation
    // ------------------------------------------------------
 
@@ -2696,7 +2716,7 @@
       int deliveryCount;
    }
 
-   private static final class CursorAckRecordEncoding implements EncodingSupport
+   public static final class CursorAckRecordEncoding implements EncodingSupport
    {
       public CursorAckRecordEncoding(final long queueID, final PagePosition position)
       {
@@ -2718,9 +2738,9 @@
          return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
       }
 
-      long queueID;
+      public long queueID;
 
-      PagePosition position;
+      public PagePosition position;
 
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -288,6 +288,13 @@
       return file;
    }
 
+   @Override
+   public String toString()
+   {
+      return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -353,9 +353,12 @@
    public String toString()
    {
       StringBuffer buffer = new StringBuffer();
-      for (TaskHolder hold : tasks)
+      if (tasks != null)
       {
-         buffer.append("Task = " + hold + "\n");
+         for (TaskHolder hold : tasks)
+         {
+            buffer.append("Task = " + hold + "\n");
+         }
       }
       
       return "OperationContextImpl [minimalStore=" + minimalStore +

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -151,6 +151,13 @@
       return getHeadersAndPropertiesEncodeSize();
    }
 
+   @Override
+   public String toString()
+   {
+      return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -553,4 +553,20 @@
       return 0;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
+    */
+   public void commit(long txID, boolean lineUpContext) throws Exception
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+    */
+   public void lineUpContext()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/ReplicationManager.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -44,7 +44,7 @@
 
    void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
 
-   void appendCommitRecord(byte journalID, long txID) throws Exception;
+   void appendCommitRecord(byte journalID, long txID, boolean lineUp) throws Exception;
 
    void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -25,6 +25,7 @@
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.replication.ReplicationManager;
 
@@ -172,7 +173,7 @@
       {
          ReplicatedJournal.trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID);
+      replicationManager.appendCommitRecord(journalID, txID, true);
       localJournal.appendCommitRecord(txID, sync);
    }
 
@@ -185,10 +186,25 @@
       {
          ReplicatedJournal.trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID);
+      replicationManager.appendCommitRecord(journalID, txID, true);
       localJournal.appendCommitRecord(txID, sync, callback);
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion, boolean)
+    */
+   public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+   {
+      if (ReplicatedJournal.trace)
+      {
+         ReplicatedJournal.trace("AppendCommit " + txID);
+      }
+      replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
+      localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
+      
+   }
+
+   
    /**
     * @param id
     * @param sync
@@ -544,6 +560,16 @@
       return localJournal.getUserVersion();
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+    */
+   public void lineUpContex(IOCompletion callback)
+   {
+      ((OperationContext)callback).replicationLineUp();
+      localJournal.lineUpContex(callback);
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -165,11 +165,11 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
     */
-   public void appendCommitRecord(final byte journalID, final long txID) throws Exception
+   public void appendCommitRecord(final byte journalID, final long txID, final boolean lineUp) throws Exception
    {
       if (enabled)
       {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
       }
    }
 
@@ -440,10 +440,18 @@
 
    private void sendReplicatePacket(final Packet packet)
    {
+      sendReplicatePacket(packet, true);
+   }
+
+   private void sendReplicatePacket(final Packet packet, boolean lineUp)
+   {
       boolean runItNow = false;
 
       OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
-      repliToken.replicationLineUp();
+      if (lineUp)
+      {
+         repliToken.replicationLineUp();
+      }
 
       synchronized (replicationLock)
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/Transaction.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -65,12 +65,10 @@
    
    boolean hasTimedOut(long currentTime, int defaultTimeout);
    
-   /** We don't want to look on operations at every send, so we keep the paging attribute and will only look at 
-    *  the PagingOperation case this attribute is true*/
-   boolean isPaging();
-   
-   void setPaging(boolean paging);
+   boolean isWaitBeforeCommit();
 
+   void setWaitBeforeCommit(boolean waitBeforeCommit);
+
    void putProperty(int index, Object property);
 
    Object getProperty(int index);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
@@ -48,7 +49,10 @@
 
    private final long id;
    
-   private boolean paging = false;
+   /**
+    * if the appendCommit has to be done only after the current operations are completed
+    */
+   private boolean waitBeforeCommit = false;
 
    private volatile State state = State.ACTIVE;
 
@@ -259,8 +263,17 @@
 
          if (containsPersistent || xid != null && state == State.PREPARED)
          {
-            storageManager.commit(id);
 
+            if (waitBeforeCommit)
+            {
+               // we will wait all the pending operations to finish before we can add this
+               asyncAppendCommit();
+            }
+            else
+            {
+               storageManager.commit(id);
+            }
+
             state = State.COMMITTED;
          }
 
@@ -288,6 +301,39 @@
       }
    }
 
+   /**
+    * 
+    */
+   protected void asyncAppendCommit()
+   {
+      final OperationContext ctx = storageManager.getContext(); 
+      storageManager.afterCompleteOperations(new IOAsyncTask()
+      {
+         public void onError(int errorCode, String errorMessage)
+         {
+         }
+         
+         public void done()
+         {
+            OperationContext originalCtx = storageManager.getContext();
+            try
+            {
+               storageManager.setContext(ctx);
+               storageManager.commit(id, false);
+            }
+            catch (Exception e)
+            {
+               onError(HornetQException.IO_ERROR, e.getMessage());
+            }
+            finally
+            {
+               storageManager.setContext(originalCtx);
+            }
+         }
+      });
+      storageManager.lineUpContext();
+   }
+
    public void rollback() throws Exception
    {
       synchronized (timeoutLock)
@@ -361,14 +407,14 @@
       this.state = state;
    }
    
-   public boolean isPaging()
+   public boolean isWaitBeforeCommit()
    {
-      return paging;
+      return waitBeforeCommit;
    }
 
-   public void setPaging(boolean paging)
+   public void setWaitBeforeCommit(boolean waitBeforeCommit)
    {
-      this.paging = paging;
+      this.waitBeforeCommit = waitBeforeCommit;
    }
 
    public Xid getXid()

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -913,6 +913,8 @@
       assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
 
       store = server.getPagingManager().getPageStore(new SimpleString("TT"));
+      
+      conn.close();
 
       server.stop();
 

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingOrderTest.
+ * 
+ * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingSyncTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   private ServerLocator locator;
+
+   public PagingSyncTest(final String name)
+   {
+      super(name);
+   }
+
+   public PagingSyncTest()
+   {
+      super();
+   }
+
+   // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PagingTest.class);
+
+   private static final int RECEIVE_TIMEOUT = 30000;
+
+   private static final int PAGE_MAX = 100 * 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      locator = createInVMNonHALocator();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      locator.close();
+
+      super.tearDown();
+   }
+   public void testOrder1() throws Throwable
+   {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setClientFailureCheckPeriod(1000);
+         locator.setConnectionTTL(2000);
+         locator.setReconnectAttempts(0);
+
+         locator.setBlockOnNonDurableSend(false);
+         locator.setBlockOnDurableSend(false);
+         locator.setBlockOnAcknowledge(false);
+         locator.setConsumerWindowSize(1024 * 1024);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         Queue queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+         }
+
+         session.commit();
+
+         session.close();
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -1059,5 +1059,23 @@
          return 0;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion, boolean)
+       */
+      public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+       */
+      public void lineUpContex(IOCompletion callback)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -1671,6 +1671,24 @@
          return getContext();
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
+       */
+      public void commit(long txID, boolean lineUpContext) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+       */
+      public void lineUpContext()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2011-04-04 20:41:34 UTC (rev 10449)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2011-04-04 21:19:27 UTC (rev 10450)
@@ -323,38 +323,38 @@
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#isPaging()
+       * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
        */
-      public boolean isPaging()
+      public boolean isContainsPersistent()
       {
          // TODO Auto-generated method stub
          return false;
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#setPaging(boolean)
+       * @see org.hornetq.core.transaction.Transaction#getAllOperations()
        */
-      public void setPaging(boolean paging)
+      public List<TransactionOperation> getAllOperations()
       {
-         // TODO Auto-generated method stub
-         
+         return null;
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
+       * @see org.hornetq.core.transaction.Transaction#isWaitBeforeCommit()
        */
-      public boolean isContainsPersistent()
+      public boolean isWaitBeforeCommit()
       {
          // TODO Auto-generated method stub
          return false;
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#getAllOperations()
+       * @see org.hornetq.core.transaction.Transaction#setWaitBeforeCommit(boolean)
        */
-      public List<TransactionOperation> getAllOperations()
+      public void setWaitBeforeCommit(boolean waitBeforeCommit)
       {
-         return null;
+         // TODO Auto-generated method stub
+         
       }
 
    }



More information about the hornetq-commits mailing list