[hornetq-commits] JBoss hornetq SVN: r10637 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed May 11 23:43:25 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-11 23:43:25 -0400 (Wed, 11 May 2011)
New Revision: 10637
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
Log:
improvement on PrintPages
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-05-12 03:24:26 UTC (rev 10636)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PrintPages.java 2011-05-12 03:43:25 UTC (rev 10637)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -40,6 +41,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -71,11 +73,12 @@
if (arg.length != 2)
{
System.err.println("Usage: PrintPages <page foler> <journal folder>");
+ System.exit(-1);
}
try
{
- Map<Long, Set<PagePosition>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+ Pair<Map<Long, Set<PagePosition>>, Set<Long>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
final ExecutorService executor = Executors.newFixedThreadPool(10);
@@ -116,7 +119,7 @@
for (PagedMessage msg : msgs)
{
msg.initMessage(sm);
- System.out.print("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+ System.out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage());
System.out.print(",Queues = ");
long q[] = msg.getQueueIDs();
for (int i = 0; i < q.length; i++)
@@ -127,7 +130,7 @@
boolean acked = false;
- Set<PagePosition> positions = cursorACKs.get(q[i]);
+ Set<PagePosition> positions = cursorACKs.a.get(q[i]);
if (positions != null)
{
acked = positions.contains(posCheck);
@@ -143,6 +146,10 @@
System.out.print(",");
}
}
+ if (msg.getTransactionID() != 0 && ! cursorACKs.b.contains(msg.getTransactionID()));
+ {
+ System.out.print(", **PG_TX_NOT_FOUND**");
+ }
System.out.println();
msgID++;
}
@@ -164,7 +171,7 @@
* @return
* @throws Exception
*/
- protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String journalLocation) throws Exception
+ protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(final String journalLocation) throws Exception
{
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
@@ -188,14 +195,17 @@
messagesJournal.load(records, txs, null);
Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+ Set<Long> pgTXs = new HashSet<Long>();
for (RecordInfo record : records)
{
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
{
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
@@ -209,8 +219,23 @@
set.add(encoding.position);
}
+ else if (record.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
+ {
+ if (record.isUpdate)
+ {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+ pgTXs.add(pageUpdate.pageTX);
+ }
+ else
+ {
+ pgTXs.add(record.id);
+ }
+ }
}
- return cursorRecords;
+
+ return new Pair<Map<Long, Set<PagePosition>>, Set<Long>>(cursorRecords, pgTXs);
}
// Package protected ---------------------------------------------
More information about the hornetq-commits
mailing list