[hornetq-commits] JBoss hornetq SVN: r10347 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Mar 21 12:18:42 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-21 12:18:40 -0400 (Mon, 21 Mar 2011)
New Revision: 10347

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
improving tool to debug the journal used on investigating the data for issues

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-03-18 12:10:55 UTC (rev 10346)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-03-21 16:18:40 UTC (rev 10347)
@@ -2785,7 +2785,13 @@
 
    private static String describeRecord(RecordInfo info)
    {
-      return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + newObjectEncoding(info);
+      return "recordID=" + info.id +
+             ";userRecordType=" +
+             info.userRecordType +
+             ";isUpdate=" +
+             info.isUpdate +
+             ";" +
+             newObjectEncoding(info);
    }
 
    private static String describeRecord(RecordInfo info, Object o)
@@ -2831,13 +2837,7 @@
          {
             final RefEncoding encoding = new RefEncoding();
             encoding.decode(buffer);
-            return new Object()
-            {
-               public String toString()
-               {
-                  return "ACK;" + encoding;
-               }
-            };
+            return new AckDescribe(encoding);
          }
 
          case UPDATE_DELIVERY_COUNT:
@@ -2942,6 +2942,7 @@
             return null;
       }
    }
+
    private static class ReferenceDescribe
    {
       RefEncoding refEncoding;
@@ -2950,12 +2951,30 @@
       {
          this.refEncoding = refEncoding;
       }
+
       public String toString()
       {
+         return "ACK;" + refEncoding;
+      }
+
+   }
+
+   private static class AckDescribe
+   {
+      RefEncoding refEncoding;
+
+      public AckDescribe(RefEncoding refEncoding)
+      {
+         this.refEncoding = refEncoding;
+      }
+
+      public String toString()
+      {
          return "AddRef;" + refEncoding;
       }
 
    }
+
    private static class MessageDescribe
    {
       public MessageDescribe(Message msg)
@@ -2980,18 +2999,18 @@
             if (value instanceof byte[])
             {
                buffer.append(prop + "=" + Arrays.toString((byte[])value) + ",");
-               
+
             }
             else
             {
                buffer.append(prop + "=" + value + ",");
             }
          }
-         
+
          buffer.append("#properties = " + properties.size());
 
          buffer.append("]");
-         
+
          buffer.append(" - " + msg.toString());
 
          return buffer.toString();
@@ -3078,51 +3097,51 @@
 
             public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
             {
-               out.println("operation at UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+               out.println("operation at UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo));
             }
 
             public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
             {
-               out.println("operation at Update," + describeRecord(recordInfo));
+               out.println("operation at Update;" + describeRecord(recordInfo));
             }
 
             public void onReadRollbackRecord(final long transactionID) throws Exception
             {
-               out.println("operation at Rollback,txID@" + transactionID);
+               out.println("operation at Rollback;txID=" + transactionID);
             }
 
             public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
             {
-               out.println("operation at Prepare,txID@" + transactionID +
-                           ",numberOfRecords@" +
+               out.println("operation at Prepare,txID=" + transactionID +
+                           ",numberOfRecords=" +
                            numberOfRecords +
-                           ",extraData@" +
+                           ",extraData=" +
                            encode(extraData));
             }
 
             public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
             {
-               out.println("operation at DeleteRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+               out.println("operation at DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
             }
 
             public void onReadDeleteRecord(final long recordID) throws Exception
             {
-               out.println("operation at DeleteRecord,id@" + recordID);
+               out.println("operation at DeleteRecord;recordID=" + recordID);
             }
 
             public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
             {
-               out.println("operation at Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
+               out.println("operation at Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords);
             }
 
             public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
             {
-               out.println("operation at AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+               out.println("operation at AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
             }
 
             public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
             {
-               out.println("operation at AddRecord," + describeRecord(recordInfo));
+               out.println("operation at AddRecord;" + describeRecord(recordInfo));
             }
 
             public void markAsDataFile(final JournalFile file)
@@ -3139,7 +3158,7 @@
       List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
 
       journal.start();
-      
+
       final StringBuffer bufferFailingTransactions = new StringBuffer();
 
       int messageCount = 0;
@@ -3168,24 +3187,37 @@
       for (RecordInfo info : records)
       {
          Object o = newObjectEncoding(info);
-         if(info.getUserRecordType() == 31)
+         if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
          {
             messageCount++;
          }
-         else if(info.getUserRecordType() == 32)
+         else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
          {
-            ReferenceDescribe ref = (ReferenceDescribe) o;
+            ReferenceDescribe ref = (ReferenceDescribe)o;
             Integer count = messageRefCounts.get(ref.refEncoding.queueID);
-            if(count == null)
+            if (count == null)
             {
                count = 1;
                messageRefCounts.put(ref.refEncoding.queueID, count);
             }
             else
             {
-               messageRefCounts.put(ref.refEncoding.queueID, count+1);
+               messageRefCounts.put(ref.refEncoding.queueID, count + 1);
             }
          }
+         else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+         {
+            AckDescribe ref = (AckDescribe)o;
+            Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+            if (count == null)
+            {
+               messageRefCounts.put(ref.refEncoding.queueID, 0);
+            }
+            else
+            {
+               messageRefCounts.put(ref.refEncoding.queueID, count - 1);
+            }
+         }
          out.println(describeRecord(info, o));
       }
 
@@ -3199,22 +3231,22 @@
          {
             Object o = newObjectEncoding(info);
             out.println("- " + describeRecord(info, o));
-            if(info.getUserRecordType() == 31)
+            if (info.getUserRecordType() == 31)
             {
                preparedMessageCount++;
             }
-            else if(info.getUserRecordType() == 32)
+            else if (info.getUserRecordType() == 32)
             {
-               ReferenceDescribe ref = (ReferenceDescribe) o;
+               ReferenceDescribe ref = (ReferenceDescribe)o;
                Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
-               if(count == null)
+               if (count == null)
                {
                   count = 1;
                   preparedMessageRefCount.put(ref.refEncoding.queueID, count);
                }
                else
                {
-                  preparedMessageRefCount.put(ref.refEncoding.queueID, count+1);
+                  preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
                }
             }
          }
@@ -3224,18 +3256,16 @@
             out.println("- " + describeRecord(info) + " <marked to delete>");
          }
       }
-      
+
       String missingTX = bufferFailingTransactions.toString();
-      
+
       if (missingTX.length() > 0)
       {
          out.println();
          out.println("### Failed Transactions (Missing commit/prepare/rollback record) ###");
       }
-      
-      
+
       out.println(bufferFailingTransactions.toString());
-      
 
       out.println("### Message Counts ###");
       out.println("message count=" + messageCount);



More information about the hornetq-commits mailing list