Author: clebert.suconic(a)jboss.com
Date: 2011-03-21 12:18:40 -0400 (Mon, 21 Mar 2011)
New Revision: 10347
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
improving tool to debug the journal used on investigating the data for issues
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-18
12:10:55 UTC (rev 10346)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-21
16:18:40 UTC (rev 10347)
@@ -2785,7 +2785,13 @@
private static String describeRecord(RecordInfo info)
{
- return "recordID=" + info.id + ";userRecordType=" +
info.userRecordType + ";isUpdate=" + info.isUpdate + ";" +
newObjectEncoding(info);
+ return "recordID=" + info.id +
+ ";userRecordType=" +
+ info.userRecordType +
+ ";isUpdate=" +
+ info.isUpdate +
+ ";" +
+ newObjectEncoding(info);
}
private static String describeRecord(RecordInfo info, Object o)
@@ -2831,13 +2837,7 @@
{
final RefEncoding encoding = new RefEncoding();
encoding.decode(buffer);
- return new Object()
- {
- public String toString()
- {
- return "ACK;" + encoding;
- }
- };
+ return new AckDescribe(encoding);
}
case UPDATE_DELIVERY_COUNT:
@@ -2942,6 +2942,7 @@
return null;
}
}
+
private static class ReferenceDescribe
{
RefEncoding refEncoding;
@@ -2950,12 +2951,30 @@
{
this.refEncoding = refEncoding;
}
+
public String toString()
{
+ return "ACK;" + refEncoding;
+ }
+
+ }
+
+ private static class AckDescribe
+ {
+ RefEncoding refEncoding;
+
+ public AckDescribe(RefEncoding refEncoding)
+ {
+ this.refEncoding = refEncoding;
+ }
+
+ public String toString()
+ {
return "AddRef;" + refEncoding;
}
}
+
private static class MessageDescribe
{
public MessageDescribe(Message msg)
@@ -2980,18 +2999,18 @@
if (value instanceof byte[])
{
buffer.append(prop + "=" + Arrays.toString((byte[])value) +
",");
-
+
}
else
{
buffer.append(prop + "=" + value + ",");
}
}
-
+
buffer.append("#properties = " + properties.size());
buffer.append("]");
-
+
buffer.append(" - " + msg.toString());
return buffer.toString();
@@ -3078,51 +3097,51 @@
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
{
- out.println("operation@UpdateTX,txID@" + transactionID +
"," + describeRecord(recordInfo));
+ out.println("operation@UpdateTX;txID=" + transactionID +
"," + describeRecord(recordInfo));
}
public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation@Update," + describeRecord(recordInfo));
+ out.println("operation@Update;" + describeRecord(recordInfo));
}
public void onReadRollbackRecord(final long transactionID) throws Exception
{
- out.println("operation@Rollback,txID@" + transactionID);
+ out.println("operation@Rollback;txID=" + transactionID);
}
public void onReadPrepareRecord(final long transactionID, final byte[]
extraData, final int numberOfRecords) throws Exception
{
- out.println("operation@Prepare,txID@" + transactionID +
- ",numberOfRecords@" +
+ out.println("operation@Prepare,txID=" + transactionID +
+ ",numberOfRecords=" +
numberOfRecords +
- ",extraData@" +
+ ",extraData=" +
encode(extraData));
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
{
- out.println("operation@DeleteRecordTX,txID@" + transactionID +
"," + describeRecord(recordInfo));
+ out.println("operation@DeleteRecordTX;txID=" + transactionID +
"," + describeRecord(recordInfo));
}
public void onReadDeleteRecord(final long recordID) throws Exception
{
- out.println("operation@DeleteRecord,id@" + recordID);
+ out.println("operation@DeleteRecord;recordID=" + recordID);
}
public void onReadCommitRecord(final long transactionID, final int
numberOfRecords) throws Exception
{
- out.println("operation@Commit,txID@" + transactionID +
",numberOfRecords@" + numberOfRecords);
+ out.println("operation@Commit;txID=" + transactionID +
",numberOfRecords=" + numberOfRecords);
}
public void onReadAddRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
{
- out.println("operation@AddRecordTX,txID@" + transactionID +
"," + describeRecord(recordInfo));
+ out.println("operation@AddRecordTX;txID=" + transactionID +
"," + describeRecord(recordInfo));
}
public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation@AddRecord," +
describeRecord(recordInfo));
+ out.println("operation@AddRecord;" +
describeRecord(recordInfo));
}
public void markAsDataFile(final JournalFile file)
@@ -3139,7 +3158,7 @@
List<PreparedTransactionInfo> preparedTransactions = new
LinkedList<PreparedTransactionInfo>();
journal.start();
-
+
final StringBuffer bufferFailingTransactions = new StringBuffer();
int messageCount = 0;
@@ -3168,24 +3187,37 @@
for (RecordInfo info : records)
{
Object o = newObjectEncoding(info);
- if(info.getUserRecordType() == 31)
+ if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
{
messageCount++;
}
- else if(info.getUserRecordType() == 32)
+ else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
{
- ReferenceDescribe ref = (ReferenceDescribe) o;
+ ReferenceDescribe ref = (ReferenceDescribe)o;
Integer count = messageRefCounts.get(ref.refEncoding.queueID);
- if(count == null)
+ if (count == null)
{
count = 1;
messageRefCounts.put(ref.refEncoding.queueID, count);
}
else
{
- messageRefCounts.put(ref.refEncoding.queueID, count+1);
+ messageRefCounts.put(ref.refEncoding.queueID, count + 1);
}
}
+ else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+ {
+ AckDescribe ref = (AckDescribe)o;
+ Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ messageRefCounts.put(ref.refEncoding.queueID, 0);
+ }
+ else
+ {
+ messageRefCounts.put(ref.refEncoding.queueID, count - 1);
+ }
+ }
out.println(describeRecord(info, o));
}
@@ -3199,22 +3231,22 @@
{
Object o = newObjectEncoding(info);
out.println("- " + describeRecord(info, o));
- if(info.getUserRecordType() == 31)
+ if (info.getUserRecordType() == 31)
{
preparedMessageCount++;
}
- else if(info.getUserRecordType() == 32)
+ else if (info.getUserRecordType() == 32)
{
- ReferenceDescribe ref = (ReferenceDescribe) o;
+ ReferenceDescribe ref = (ReferenceDescribe)o;
Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
- if(count == null)
+ if (count == null)
{
count = 1;
preparedMessageRefCount.put(ref.refEncoding.queueID, count);
}
else
{
- preparedMessageRefCount.put(ref.refEncoding.queueID, count+1);
+ preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
}
}
}
@@ -3224,18 +3256,16 @@
out.println("- " + describeRecord(info) + " <marked to
delete>");
}
}
-
+
String missingTX = bufferFailingTransactions.toString();
-
+
if (missingTX.length() > 0)
{
out.println();
out.println("### Failed Transactions (Missing commit/prepare/rollback
record) ###");
}
-
-
+
out.println(bufferFailingTransactions.toString());
-
out.println("### Message Counts ###");
out.println("message count=" + messageCount);