[hornetq-commits] JBoss hornetq SVN: r10439 - in trunk/src/main/org/hornetq/core: persistence/impl/journal and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Apr 1 23:53:16 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-01 23:53:14 -0400 (Fri, 01 Apr 2011)
New Revision: 10439

Modified:
   trunk/src/main/org/hornetq/core/paging/PrintPages.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Tool to Print Page Information - used to debug customer's data

Modified: trunk/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PrintPages.java	2011-04-01 22:16:46 UTC (rev 10438)
+++ trunk/src/main/org/hornetq/core/paging/PrintPages.java	2011-04-02 03:53:14 UTC (rev 10439)
@@ -13,16 +13,33 @@
 
 package org.hornetq.core.paging;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -48,29 +65,36 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
-   
-   public static void main(String arg[])
+
+   public static void main(final String arg[])
    {
+      if (arg.length != 2)
+      {
+         System.err.println("Usage: PrintPages <page foler> <journal folder>");
+      }
       try
       {
+
+         Map<Long, Set<PagePosition>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+
          ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
          final ExecutorService executor = Executors.newFixedThreadPool(10);
          ExecutorFactory execfactory = new ExecutorFactory()
          {
-            
+
             public Executor getExecutor()
             {
                return executor;
             }
          };
          PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false);
-         HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>(); 
+         HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
          addressSettingsRepository.setDefault(new AddressSettings());
          StorageManager sm = new NullStorageManager();
          PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
-         
+
          manager.start();
-         
+
          SimpleString stores[] = manager.getStoreNames();
 
          for (SimpleString store : stores)
@@ -79,28 +103,55 @@
             System.out.println("Exploring store " + store);
             PagingStore pgStore = manager.getPageStore(store);
             int pgid = (int)pgStore.getFirstPage();
-            for (int pg = 0 ; pg < pgStore.getNumberOfPages(); pg++)
+            for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
             {
                System.out.println("*******   Page " + pgid);
                Page page = pgStore.createPage(pgid);
                page.open();
                List<PagedMessage> msgs = page.read();
                page.close();
-               
+
                int msgID = 0;
-               
+
                for (PagedMessage msg : msgs)
                {
                   msg.initMessage(sm);
-                  System.out.println("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+                  System.out.print("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+                  System.out.print(",Queues = ");
+                  long q[] = msg.getQueueIDs();
+                  for (int i = 0; i < q.length; i++)
+                  {
+                     System.out.print(q[i]);
+
+                     PagePosition posCheck = new PagePositionImpl(pgid, msgID);
+
+                     boolean acked = false;
+
+                     Set<PagePosition> positions = cursorACKs.get(q[i]);
+                     if (positions != null)
+                     {
+                        acked = positions.contains(posCheck);
+                     }
+
+                     if (acked)
+                     {
+                        System.out.print(" (ACK)");
+                     }
+
+                     if (i + 1 < q.length)
+                     {
+                        System.out.print(",");
+                     }
+                  }
+                  System.out.println();
                   msgID++;
                }
-               
-               pgid ++;
-               
+
+               pgid++;
+
             }
          }
-         
+
       }
       catch (Exception e)
       {
@@ -108,6 +159,60 @@
       }
    }
 
+   /**
+    * @param journalLocation
+    * @return
+    * @throws Exception
+    */
+   protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String journalLocation) throws Exception
+   {
+      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+
+      // Will use only default values. The load function should adapt to anything different
+      ConfigurationImpl defaultValues = new ConfigurationImpl();
+
+      JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
+                                                    defaultValues.getJournalMinFiles(),
+                                                    0,
+                                                    0,
+                                                    messagesFF,
+                                                    "hornetq-data",
+                                                    "hq",
+                                                    1);
+
+      messagesJournal.start();
+
+      ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+      ArrayList<PreparedTransactionInfo> txs = new ArrayList<PreparedTransactionInfo>();
+
+      messagesJournal.load(records, txs, null);
+
+      Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+      for (RecordInfo record : records)
+      {
+         if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+         {
+            byte[] data = record.data;
+
+            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+            CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+            encoding.decode(buff);
+
+            Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+            if (set == null)
+            {
+               set = new HashSet<PagePosition>();
+               cursorRecords.put(encoding.queueID, set);
+            }
+
+            set.add(encoding.position);
+         }
+      }
+      return cursorRecords;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-01 22:16:46 UTC (rev 10438)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-02 03:53:14 UTC (rev 10439)
@@ -2696,7 +2696,7 @@
       int deliveryCount;
    }
 
-   private static final class CursorAckRecordEncoding implements EncodingSupport
+   public static final class CursorAckRecordEncoding implements EncodingSupport
    {
       public CursorAckRecordEncoding(final long queueID, final PagePosition position)
       {
@@ -2718,9 +2718,9 @@
          return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
       }
 
-      long queueID;
+      public long queueID;
 
-      PagePosition position;
+      public PagePosition position;
 
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()



More information about the hornetq-commits mailing list