[hornetq-commits] JBoss hornetq SVN: r10637 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed May 11 23:43:25 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-11 23:43:25 -0400 (Wed, 11 May 2011)
New Revision: 10637

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
Log:
improvement on PrintPages

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-05-12 03:24:26 UTC (rev 10636)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java	2011-05-12 03:43:25 UTC (rev 10637)
@@ -26,6 +26,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -40,6 +41,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -71,11 +73,12 @@
       if (arg.length != 2)
       {
          System.err.println("Usage: PrintPages <page foler> <journal folder>");
+         System.exit(-1);
       }
       try
       {
 
-         Map<Long, Set<PagePosition>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+         Pair<Map<Long, Set<PagePosition>>, Set<Long>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
 
          ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
          final ExecutorService executor = Executors.newFixedThreadPool(10);
@@ -116,7 +119,7 @@
                for (PagedMessage msg : msgs)
                {
                   msg.initMessage(sm);
-                  System.out.print("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+                  System.out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage());
                   System.out.print(",Queues = ");
                   long q[] = msg.getQueueIDs();
                   for (int i = 0; i < q.length; i++)
@@ -127,7 +130,7 @@
 
                      boolean acked = false;
 
-                     Set<PagePosition> positions = cursorACKs.get(q[i]);
+                     Set<PagePosition> positions = cursorACKs.a.get(q[i]);
                      if (positions != null)
                      {
                         acked = positions.contains(posCheck);
@@ -143,6 +146,10 @@
                         System.out.print(",");
                      }
                   }
+                  if (msg.getTransactionID() != 0 && ! cursorACKs.b.contains(msg.getTransactionID()));
+                  {
+                     System.out.print(", **PG_TX_NOT_FOUND**");
+                  }
                   System.out.println();
                   msgID++;
                }
@@ -164,7 +171,7 @@
     * @return
     * @throws Exception
     */
-   protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String journalLocation) throws Exception
+   protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(final String journalLocation) throws Exception
    {
       SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
 
@@ -188,14 +195,17 @@
       messagesJournal.load(records, txs, null);
 
       Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+      
+      Set<Long> pgTXs = new HashSet<Long>();
 
       for (RecordInfo record : records)
       {
+         byte[] data = record.data;
+
+         HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
          if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
          {
-            byte[] data = record.data;
-
-            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
             CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
             encoding.decode(buff);
 
@@ -209,8 +219,23 @@
 
             set.add(encoding.position);
          }
+         else if (record.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
+         {
+            if (record.isUpdate)
+            {
+               PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+               pageUpdate.decode(buff);
+               pgTXs.add(pageUpdate.pageTX);
+            }
+            else
+            {
+               pgTXs.add(record.id);
+            }
+         }
       }
-      return cursorRecords;
+      
+      return new Pair<Map<Long, Set<PagePosition>>, Set<Long>>(cursorRecords, pgTXs);
    }
 
    // Package protected ---------------------------------------------



More information about the hornetq-commits mailing list