Author: clebert.suconic(a)jboss.com
Date: 2011-04-01 23:53:14 -0400 (Fri, 01 Apr 2011)
New Revision: 10439
Modified:
trunk/src/main/org/hornetq/core/paging/PrintPages.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Tool to Print Page Information - used to debug customer's data
Modified: trunk/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-01 22:16:46 UTC (rev
10438)
+++ trunk/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-02 03:53:14 UTC (rev
10439)
@@ -13,16 +13,33 @@
package org.hornetq.core.paging;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import
org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -48,29 +65,36 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
- public static void main(String arg[])
+
+ public static void main(final String arg[])
{
+ if (arg.length != 2)
+ {
+ System.err.println("Usage: PrintPages <page foler> <journal
folder>");
+ }
try
{
+
+ Map<Long, Set<PagePosition>> cursorACKs =
PrintPages.loadCursorACKs(arg[1]);
+
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
final ExecutorService executor = Executors.newFixedThreadPool(10);
ExecutorFactory execfactory = new ExecutorFactory()
{
-
+
public Executor getExecutor()
{
return executor;
}
};
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l,
scheduled, execfactory, false);
- HierarchicalRepository<AddressSettings> addressSettingsRepository = new
HierarchicalObjectRepository<AddressSettings>();
+ HierarchicalRepository<AddressSettings> addressSettingsRepository = new
HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
StorageManager sm = new NullStorageManager();
PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm,
addressSettingsRepository);
-
+
manager.start();
-
+
SimpleString stores[] = manager.getStoreNames();
for (SimpleString store : stores)
@@ -79,28 +103,55 @@
System.out.println("Exploring store " + store);
PagingStore pgStore = manager.getPageStore(store);
int pgid = (int)pgStore.getFirstPage();
- for (int pg = 0 ; pg < pgStore.getNumberOfPages(); pg++)
+ for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
{
System.out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read();
page.close();
-
+
int msgID = 0;
-
+
for (PagedMessage msg : msgs)
{
msg.initMessage(sm);
- System.out.println("pg=" + pg + ", msg=" + msgID +
"=" + msg.getMessage());
+ System.out.print("pg=" + pg + ", msg=" + msgID +
"=" + msg.getMessage());
+ System.out.print(",Queues = ");
+ long q[] = msg.getQueueIDs();
+ for (int i = 0; i < q.length; i++)
+ {
+ System.out.print(q[i]);
+
+ PagePosition posCheck = new PagePositionImpl(pgid, msgID);
+
+ boolean acked = false;
+
+ Set<PagePosition> positions = cursorACKs.get(q[i]);
+ if (positions != null)
+ {
+ acked = positions.contains(posCheck);
+ }
+
+ if (acked)
+ {
+ System.out.print(" (ACK)");
+ }
+
+ if (i + 1 < q.length)
+ {
+ System.out.print(",");
+ }
+ }
+ System.out.println();
msgID++;
}
-
- pgid ++;
-
+
+ pgid++;
+
}
}
-
+
}
catch (Exception e)
{
@@ -108,6 +159,60 @@
}
}
+ /**
+ * @param journalLocation
+ * @return
+ * @throws Exception
+ */
+ protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String
journalLocation) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+
+ // Will use only default values. The load function should adapt to anything
different
+ ConfigurationImpl defaultValues = new ConfigurationImpl();
+
+ JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
+ defaultValues.getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+
+ messagesJournal.start();
+
+ ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> txs = new
ArrayList<PreparedTransactionInfo>();
+
+ messagesJournal.load(records, txs, null);
+
+ Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long,
Set<PagePosition>>();
+
+ for (RecordInfo record : records)
+ {
+ if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+ if (set == null)
+ {
+ set = new HashSet<PagePosition>();
+ cursorRecords.put(encoding.queueID, set);
+ }
+
+ set.add(encoding.position);
+ }
+ }
+ return cursorRecords;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-01
22:16:46 UTC (rev 10438)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-02
03:53:14 UTC (rev 10439)
@@ -2696,7 +2696,7 @@
int deliveryCount;
}
- private static final class CursorAckRecordEncoding implements EncodingSupport
+ public static final class CursorAckRecordEncoding implements EncodingSupport
{
public CursorAckRecordEncoding(final long queueID, final PagePosition position)
{
@@ -2718,9 +2718,9 @@
return "CursorAckRecordEncoding [queueID=" + queueID + ",
position=" + position + "]";
}
- long queueID;
+ public long queueID;
- PagePosition position;
+ public PagePosition position;
/* (non-Javadoc)
* @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()