[hornetq-commits] JBoss hornetq SVN: r11615 - trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 28 09:00:40 EDT 2011


Author: borges
Date: 2011-10-28 09:00:40 -0400 (Fri, 28 Oct 2011)
New Revision: 11615

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Use locks on confirmPendingLargeMessage(|TX), methods were added on merge from 2_2_EAP.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-10-28 13:00:26 UTC (rev 11614)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-10-28 13:00:40 UTC (rev 11615)
@@ -349,66 +349,66 @@
         return replicator != null;
     }
 
-    /**
-     * @param replicationManager
-     * @param pagingManager
-     * @throws HornetQException
-     */
+   /**
+    * @param replicationManager
+    * @param pagingManager
+    * @throws HornetQException
+    */
    @Override
-       public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
-    {
-        if (!started)
-            {
-                throw new IllegalStateException("JournalStorageManager must be started...");
-            }
-        assert replicationManager != null;
+   public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
+   {
+      if (!started)
+      {
+         throw new IllegalStateException("JournalStorageManager must be started...");
+      }
+      assert replicationManager != null;
 
-        if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof JournalImpl))
-            {
-                throw new HornetQException(HornetQException.INTERNAL_ERROR,
-                                           "journals here are not JournalImpl. You can't set a replicator!");
-            }
-        JournalFile[] messageFiles = null;
-        JournalFile[] bindingsFiles = null;
+      if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof JournalImpl))
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR,
+                                    "journals here are not JournalImpl. You can't set a replicator!");
+      }
+      JournalFile[] messageFiles = null;
+      JournalFile[] bindingsFiles = null;
 
-        final Journal localMessageJournal = messageJournal;
-        final Journal localBindingsJournal = bindingsJournal;
+      final Journal localMessageJournal = messageJournal;
+      final Journal localBindingsJournal = bindingsJournal;
 
-        Map<String, Long> largeMessageFilesToSync;
-        Map<SimpleString, Collection<Integer>> pageFilesToSync;
-        storageManagerLock.writeLock().lock();
+      Map<String, Long> largeMessageFilesToSync;
+      Map<SimpleString, Collection<Integer>> pageFilesToSync;
+      storageManagerLock.writeLock().lock();
       try
-          {
-              replicator = replicationManager;
-              localMessageJournal.synchronizationLock();
-              localBindingsJournal.synchronizationLock();
+      {
+         replicator = replicationManager;
+         localMessageJournal.synchronizationLock();
+         localBindingsJournal.synchronizationLock();
          try
-             {
+         {
             pagingManager.lock();
             try
-                {
-                    messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
-                    bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
-                    pageFilesToSync = getPageInformationForSync(pagingManager);
-                    largeMessageFilesToSync = getLargeMessageInformation();
-                }
+            {
+               messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+               bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+               pageFilesToSync = getPageInformationForSync(pagingManager);
+               largeMessageFilesToSync = getLargeMessageInformation();
+            }
             finally
-                {
+            {
                pagingManager.unlock();
-                }
-             }
+            }
+         }
          finally
-             {
-                 localMessageJournal.synchronizationUnlock();
-                 localBindingsJournal.synchronizationUnlock();
-             }
+         {
+            localMessageJournal.synchronizationUnlock();
+            localBindingsJournal.synchronizationUnlock();
+         }
          bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
          messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
-          }
+      }
       finally
-          {
-              storageManagerLock.writeLock().unlock();
-          }
+      {
+         storageManagerLock.writeLock().unlock();
+      }
 
       sendJournalFile(messageFiles, JournalContent.MESSAGES);
       sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
@@ -416,36 +416,35 @@
       sendPagesToBackup(pageFilesToSync, pagingManager);
 
       storageManagerLock.writeLock().lock();
-         try
-             {
-                 replicator.sendSynchronizationDone();
-                 // XXX HORNETQ-720 SEND a compare journals message?
-             }
-         finally
-             {
-                 storageManagerLock.writeLock().unlock();
-             }
-    }
+      try
+      {
+         replicator.sendSynchronizationDone();
+         // XXX HORNETQ-720 SEND a compare journals message?
+      }
+      finally
+      {
+         storageManagerLock.writeLock().unlock();
+      }
+   }
 
 
+   /**
+    * @param pageFilesToSync
+    * @throws Exception
+    */
+   private void sendPagesToBackup(Map<SimpleString, Collection<Integer>> pageFilesToSync, PagingManager manager)
+      throws Exception
+   {
+      for (Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet())
+      {
+         if (!started)
+            return;
+         PagingStore store = manager.getPageStore(entry.getKey());
+         store.sendPages(replicator, entry.getValue());
+      }
+   }
 
     /**
-     * @param pageFilesToSync
-     * @throws Exception
-     */
-    private void sendPagesToBackup(Map<SimpleString, Collection<Integer>> pageFilesToSync, PagingManager manager)
-            throws Exception
-    {
-        for (Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet())
-            {
-                if (!started)
-                    return;
-                PagingStore store = manager.getPageStore(entry.getKey());
-                store.sendPages(replicator, entry.getValue());
-            }
-    }
-
-    /**
      * @param pagingManager
      * @return
      * @throws Exception
@@ -573,16 +572,18 @@
             }
     }
 
-    /* (non-Javadoc)
-     * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString, int, org.hornetq.api.core.buffers.ChannelBuffer)
-     */
-    public void pageWrite(final PagedMessage message, final int pageNumber)
-    {
-        if (isReplicated())
-            {
-                replicator.pageWrite(message, pageNumber);
-            }
-    }
+   /*
+    * (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString,
+    * int, org.hornetq.api.core.buffers.ChannelBuffer)
+    */
+   public void pageWrite(final PagedMessage message, final int pageNumber)
+   {
+      if (isReplicated())
+      {
+         replicator.pageWrite(message, pageNumber);
+      }
+   }
 
     /* (non-Javadoc)
      * @see org.hornetq.core.persistence.StorageManager#getContext()
@@ -637,58 +638,58 @@
         return new LargeServerMessageImpl(this);
     }
 
-    protected final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes)
-            throws Exception
-    {
-        readLock();
+   protected final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes)
+      throws Exception
+   {
+      readLock();
       try
-          {
-              file.position(file.size());
+      {
+         file.position(file.size());
 
-              file.writeDirect(ByteBuffer.wrap(bytes), false);
+         file.writeDirect(ByteBuffer.wrap(bytes), false);
 
-              if (isReplicated())
-                  {
-                      replicator.largeMessageWrite(messageId, bytes);
-                  }
-          }
+         if (isReplicated())
+         {
+            replicator.largeMessageWrite(messageId, bytes);
+         }
+      }
       finally
-          {
-              readUnLock();
-          }
-    }
+      {
+         readUnLock();
+      }
+   }
 
    public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception
-    {
-        readLock();
+   {
+      readLock();
       try
-          {
-              if (isReplicated())
-                  {
-                      replicator.largeMessageBegin(id);
-                  }
+      {
+         if (isReplicated())
+         {
+            replicator.largeMessageBegin(id);
+         }
 
-              LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
+         LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
 
-              largeMessage.copyHeadersAndProperties(message);
+         largeMessage.copyHeadersAndProperties(message);
 
-              largeMessage.setMessageID(id);
+         largeMessage.setMessageID(id);
 
-              if (largeMessage.isDurable())
-                  {
-                      // We store a marker on the journal that the large file is pending
-                      long pendingRecordID = storePendingLargeMessage(id);
+         if (largeMessage.isDurable())
+         {
+            // We store a marker on the journal that the large file is pending
+            long pendingRecordID = storePendingLargeMessage(id);
 
-                      largeMessage.setPendingRecordID(pendingRecordID);
-                  }
+            largeMessage.setPendingRecordID(pendingRecordID);
+         }
 
-              return largeMessage;
+         return largeMessage;
       }
-              finally
-                  {
-                      readUnLock();
-                  }
-          }
+      finally
+      {
+         readUnLock();
+      }
+   }
 
       // Non transactional operations
 
@@ -713,100 +714,104 @@
       }
    }
 
-      public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
+   public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
+   {
+      readLock();
+      try
       {
-          installLargeMessageConfirmationOnTX(tx, recordID);
-          messageJournal.appendDeleteRecordTransactional(tx.getID(),
-                                                         recordID,
-                                                         new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+         installLargeMessageConfirmationOnTX(tx, recordID);
+         messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID,
+                                                     new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
       }
-
-      /** We don't need messageID now but we are likely to need it we ever decide to support a database */
-      public void confirmPendingLargeMessage(long recordID) throws Exception
+      finally
       {
-          messageJournal.appendDeleteRecord(recordID, true, getContext());
+         readUnLock();
       }
+   }
 
-      public void storeMessage(final ServerMessage message) throws Exception
-      {
-          if (message.getMessageID() <= 0)
-              {
-                  // Sanity check only... this shouldn't happen unless there is a bug
-                  throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
-              }
-
-          readLock();
+   /** We don't need messageID now but we are likely to need it we ever decide to support a database */
+   public void confirmPendingLargeMessage(long recordID) throws Exception
+   {
+      readLock();
       try
-          {
-              // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
-
-              if (message.isLargeMessage())
-                  {
-                      messageJournal.appendAddRecord(message.getMessageID(),
-                                                     JournalStorageManager.ADD_LARGE_MESSAGE,
-                                                     new LargeMessageEncoding((LargeServerMessage)message),
-                                                     false,
-                                                     getContext(false));
-                  }
-              else
-                  {
-                      messageJournal.appendAddRecord(message.getMessageID(),
-                                                     JournalStorageManager.ADD_MESSAGE,
-                                                     message,
-                                                     false,
-                                                     getContext(false));
-                  }
-          }
+      {
+         messageJournal.appendDeleteRecord(recordID, true, getContext());
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
-
+   public void storeMessage(final ServerMessage message) throws Exception
+   {
+      if (message.getMessageID() <= 0)
       {
-          readLock();
+         // Sanity check only... this shouldn't happen unless there is a bug
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+      }
+
+      readLock();
       try
-          {
-              messageJournal.appendUpdateRecord(messageID,
-                                                JournalStorageManager.ADD_REF,
-                                                new RefEncoding(queueID),
-                                                last && syncNonTransactional,
-                                                getContext(last && syncNonTransactional));
-          }
+      {
+         // Note that we don't sync, the add reference that comes immediately after will sync if
+         // appropriate
+
+         if (message.isLargeMessage())
+         {
+            messageJournal.appendAddRecord(message.getMessageID(), JournalStorageManager.ADD_LARGE_MESSAGE,
+                                           new LargeMessageEncoding((LargeServerMessage)message), false,
+                                           getContext(false));
+         }
+         else
+         {
+            messageJournal.appendAddRecord(message.getMessageID(), JournalStorageManager.ADD_MESSAGE, message, false,
+                                           getContext(false));
+         }
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      private void readLock()
+   public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
+   {
+      readLock();
+      try
       {
-          storageManagerLock.readLock().lock();
+         messageJournal.appendUpdateRecord(messageID, JournalStorageManager.ADD_REF, new RefEncoding(queueID), last &&
+                  syncNonTransactional, getContext(last && syncNonTransactional));
       }
-
-      private void readUnLock()
+      finally
       {
-          storageManagerLock.readLock().unlock();
+         readUnLock();
       }
+   }
 
-      public void storeAcknowledge(final long queueID, final long messageID) throws Exception
-      {
-          readLock();
+   private void readLock()
+   {
+      storageManagerLock.readLock().lock();
+   }
+
+   private void readUnLock()
+   {
+      storageManagerLock.readLock().unlock();
+   }
+
+   public void storeAcknowledge(final long queueID, final long messageID) throws Exception
+   {
+      readLock();
       try
-          {
-              messageJournal.appendUpdateRecord(messageID,
-                                                JournalStorageManager.ACKNOWLEDGE_REF,
-                                                new RefEncoding(queueID),
-                                                syncNonTransactional,
-                                                getContext(syncNonTransactional));
-          }
+      {
+         messageJournal.appendUpdateRecord(messageID, JournalStorageManager.ACKNOWLEDGE_REF, new RefEncoding(queueID),
+                                           syncNonTransactional, getContext(syncNonTransactional));
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
       public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
       {
@@ -882,135 +887,125 @@
           }
       }
 
-      public void deleteDuplicateID(final long recordID) throws Exception
-      {
-          readLock();
+   public void deleteDuplicateID(final long recordID) throws Exception
+   {
+      readLock();
       try
-          {
-              messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
-          }
+      {
+         messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
       // Transactional operations
 
-      public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
+   public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
+   {
+      if (message.getMessageID() <= 0)
       {
-          if (message.getMessageID() <= 0)
-              {
-                  throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
-              }
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+      }
 
-          readLock();
+      readLock();
       try
-          {
-              if (message.isLargeMessage())
-                  {
-                      messageJournal.appendAddRecordTransactional(txID,
-                                                                  message.getMessageID(),
-                                                                  JournalStorageManager.ADD_LARGE_MESSAGE,
-                                                                  new LargeMessageEncoding(((LargeServerMessage)message)));
-                  }
-              else
-                  {
-                      messageJournal.appendAddRecordTransactional(txID,
-                                                                  message.getMessageID(),
-                                                                  JournalStorageManager.ADD_MESSAGE,
-                                                                  message);
-                  }
+      {
+         if (message.isLargeMessage())
+         {
+            messageJournal.appendAddRecordTransactional(txID, message.getMessageID(),
+                                                        JournalStorageManager.ADD_LARGE_MESSAGE,
+                                                        new LargeMessageEncoding(((LargeServerMessage)message)));
+         }
+         else
+         {
+            messageJournal.appendAddRecordTransactional(txID, message.getMessageID(),
+                                                        JournalStorageManager.ADD_MESSAGE, message);
+         }
 
-          }
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
-      {
-          readLock();
+   public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
+   {
+      readLock();
       try
-          {
-              pageTransaction.setRecordID(generateUniqueID());
-
-              messageJournal.appendAddRecordTransactional(txID,
-                                                          pageTransaction.getRecordID(),
-                                                          JournalStorageManager.PAGE_TRANSACTION,
-                                                          pageTransaction);
-          }
+      {
+         pageTransaction.setRecordID(generateUniqueID());
+         messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(),
+                                                     JournalStorageManager.PAGE_TRANSACTION, pageTransaction);
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
-      {
-          readLock();
+   public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages)
+      throws Exception
+   {
+      readLock();
       try
-          {
-              messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
-                                                             JournalStorageManager.PAGE_TRANSACTION,
-                                                             new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
-                                                                                      depages));
-          }
+      {
+         messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
+                                                        JournalStorageManager.PAGE_TRANSACTION,
+                                                        new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+                                                                                 depages));
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
-      {
-          readLock();
+   public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
+   {
+      readLock();
       try
-          {
-              messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
-                                                JournalStorageManager.PAGE_TRANSACTION,
-                                                new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
-                                                syncNonTransactional,
-                                                getContext(syncNonTransactional));
-          }
+      {
+         messageJournal.appendUpdateRecord(pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION,
+                                           new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
+                                           syncNonTransactional, getContext(syncNonTransactional));
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
-      {
-          readLock();
+   public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
+   {
+      readLock();
       try
-          {
-              messageJournal.appendUpdateRecordTransactional(txID,
-                                                             messageID,
-                                                             JournalStorageManager.ADD_REF,
-                                                             new RefEncoding(queueID));
-          }
+      {
+         messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalStorageManager.ADD_REF,
+                                                        new RefEncoding(queueID));
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
-      {
-          readLock();
+   public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID)
+      throws Exception
+   {
+      readLock();
       try
-          {
-              messageJournal.appendUpdateRecordTransactional(txID,
-                                                             messageID,
-                                                             JournalStorageManager.ACKNOWLEDGE_REF,
-                                                             new RefEncoding(queueID));
-          }
+      {
+         messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalStorageManager.ACKNOWLEDGE_REF,
+                                                        new RefEncoding(queueID));
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
@@ -1201,48 +1196,46 @@
           }
       }
 
-      public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
-      {
-          readLock();
+   public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
+   {
+      readLock();
       try
-          {
-              messageJournal.appendDeleteRecordTransactional(txID, recordID);
-          }
+      {
+         messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      }
       finally
-          {
-              readUnLock();
-          }
+      {
+         readUnLock();
       }
+   }
 
-      // Other operations
+   // Other operations
 
-      public void updateDeliveryCount(final MessageReference ref) throws Exception
+   public void updateDeliveryCount(final MessageReference ref) throws Exception
+   {
+      // no need to store if it's the same value
+      // otherwise the journal will get OME in case of lots of redeliveries
+      if (ref.getDeliveryCount() != ref.getPersistedCount())
       {
-          // no need to store if it's the same value
-          // otherwise the journal will get OME in case of lots of redeliveries
-          if (ref.getDeliveryCount() != ref.getPersistedCount())
-              {
-                  ref.setPersistedCount(ref.getDeliveryCount());
-                  DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
-                                                                                           ref.getDeliveryCount());
+         ref.setPersistedCount(ref.getDeliveryCount());
+         DeliveryCountUpdateEncoding updateInfo =
+                  new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
 
-                  readLock();
+         readLock();
          try
-             {
-                 messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
-                                                   JournalStorageManager.UPDATE_DELIVERY_COUNT,
-                                                   updateInfo,
+         {
+            messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+                                              JournalStorageManager.UPDATE_DELIVERY_COUNT, updateInfo,
 
-                                                   syncNonTransactional,
-                                                   getContext(syncNonTransactional));
-             }
+                                              syncNonTransactional, getContext(syncNonTransactional));
+         }
 
          finally
-             {
-                 readUnLock();
-             }
-              }
+         {
+            readUnLock();
+         }
       }
+   }
 
       public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
       {
@@ -1928,26 +1921,22 @@
           }
     }
 
-    /* (non-Javadoc)
-     * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
-     */
-    public long storePageCounter(long txID, long queueID, long value) throws Exception
-    {
-        readLock();
+   @Override
+   public long storePageCounter(long txID, long queueID, long value) throws Exception
+   {
+      readLock();
       try
-          {
-              final long recordID = idGenerator.generateID();
-              messageJournal.appendAddRecordTransactional(txID,
-                                                          recordID,
-                                                          JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
-                                                          new PageCountRecord(queueID, value));
-              return recordID;
-          }
+      {
+         final long recordID = idGenerator.generateID();
+         messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+                                                     new PageCountRecord(queueID, value));
+         return recordID;
+      }
       finally
-          {
-              readUnLock();
-          }
-    }
+      {
+         readUnLock();
+      }
+   }
 
     /* (non-Javadoc)
      * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
@@ -2184,11 +2173,11 @@
          return;
       }
       Runnable deleteAction = new Runnable()
-            {
-                public void run()
-                {
+      {
+         public void run()
+         {
             try
-                {
+            {
                readLock();
                try
                {
@@ -2196,31 +2185,31 @@
                   {
                      replicator.largeMessageDelete(largeServerMessageImpl.getMessageID());
                   }
-                    file.delete();
-                }
+                  file.delete();
+               }
                finally
                {
                   readUnLock();
                }
             }
             catch (Exception e)
-                {
-                    JournalStorageManager.log.warn(e.getMessage(), e);
-                }
-                }
-
-            };
-
-        if (executor == null)
             {
-                deleteAction.run();
+               JournalStorageManager.log.warn(e.getMessage(), e);
             }
-        else
-            {
-                executor.execute(deleteAction);
-            }
-    }
+         }
 
+      };
+
+      if (executor == null)
+      {
+         deleteAction.run();
+      }
+      else
+      {
+         executor.execute(deleteAction);
+      }
+   }
+
     /**
      * @param messageID
      * @return
@@ -2542,33 +2531,30 @@
             }
     }
 
-    /**
-     * @throws Exception
-     */
-    private void cleanupIncompleteFiles() throws Exception
-    {
-        if (largeMessagesFactory != null)
-            {
-                List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
-                for (String tmpFile : tmpFiles)
-                    {
-                        SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
-                        file.delete();
-                    }
-            }
-    }
+   private void cleanupIncompleteFiles() throws Exception
+   {
+      if (largeMessagesFactory != null)
+      {
+         List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
+         for (String tmpFile : tmpFiles)
+         {
+            SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
+            file.delete();
+         }
+      }
+   }
 
-    private OperationContext getContext(final boolean sync)
-    {
-        if (sync)
-            {
-                return getContext();
-            }
-        else
-            {
-                return DummyOperationContext.getInstance();
-            }
-    }
+   private OperationContext getContext(final boolean sync)
+   {
+      if (sync)
+      {
+         return getContext();
+      }
+      else
+      {
+         return DummyOperationContext.getInstance();
+      }
+   }
 
     private static ClassLoader getThisClassLoader()
     {
@@ -2714,14 +2700,11 @@
 
        public boolean isCommit;
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
-       }
+      public String toString()
+      {
+         return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
+      }
 
        HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
        {
@@ -2807,15 +2790,12 @@
             return clusterName;
         }
 
-        /* (non-Javadoc)
-         * @see java.lang.Object#toString()
-         */
       @Override
-          public String toString()
-        {
-            return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
-        }
-    }
+      public String toString()
+      {
+         return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
+      }
+   }
 
    public static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
     {
@@ -2831,21 +2811,12 @@
         {
         }
 
-        /* (non-Javadoc)
-         * @see java.lang.Object#toString()
-         */
       @Override
-          public String toString()
-        {
-         return "PersistentQueueBindingEncoding [id=" + id +
-                ", name=" +
-                name +
-                ", address=" +
-                address +
-                ", filterString=" +
-                filterString +
-             "]";
-        }
+      public String toString()
+      {
+         return "PersistentQueueBindingEncoding [id=" + id + ", name=" + name + ", address=" + address +
+                  ", filterString=" + filterString + "]";
+      }
 
         public PersistentQueueBindingEncoding(final SimpleString name,
                                               final SimpleString address,
@@ -3017,15 +2988,11 @@
            return 8 + 4;
        }
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
-       }
-
+      public String toString()
+      {
+         return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
+      }
    }
 
    public static class QueueEncoding implements EncodingSupport
@@ -3058,33 +3025,30 @@
            return 8;
        }
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "QueueEncoding [queueID=" + queueID + "]";
-       }
+      public String toString()
+      {
+         return "QueueEncoding [queueID=" + queueID + "]";
+      }
 
    }
 
    private static class DeleteEncoding extends QueueEncoding
    {
-       public byte recordType;
+      public byte recordType;
 
-       public long id;
+      public long id;
 
-       public DeleteEncoding()
-       {
-           super();
-       }
+      public DeleteEncoding()
+      {
+         super();
+      }
 
-       public DeleteEncoding(final byte recordType, final long id)
-       {
-           this.recordType = recordType;
-           this.id = id;
-       }
+      public DeleteEncoding(final byte recordType, final long id)
+      {
+         this.recordType = recordType;
+         this.id = id;
+      }
 
        /* (non-Javadoc)
         * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
@@ -3132,18 +3096,15 @@
    public static class PageUpdateTXEncoding implements EncodingSupport
    {
 
-       public long pageTX;
+      public long pageTX;
 
-       public int recods;
+      public int recods;
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
-       }
+      public String toString()
+      {
+         return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
+      }
 
        public PageUpdateTXEncoding()
        {
@@ -3188,14 +3149,11 @@
    {
        long scheduledDeliveryTime;
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
-       }
+      public String toString()
+      {
+         return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
+      }
 
        private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID)
        {
@@ -3270,12 +3228,9 @@
            return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
        }
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
+      public String toString()
+      {
            // this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
            // and this may be useful to validate the journal on those tests
            // You may uncomment these two lines on that case and replcate the toString for the PrintData
@@ -3341,14 +3296,11 @@
    private static final class PageCountRecord implements EncodingSupport
    {
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
-       }
+      public String toString()
+      {
+         return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
+      }
 
        PageCountRecord()
        {
@@ -3396,14 +3348,11 @@
    private static final class PageCountRecordInc implements EncodingSupport
    {
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
-       }
+      public String toString()
+      {
+         return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
+      }
 
        PageCountRecordInc()
        {
@@ -3475,14 +3424,11 @@
            this.position = new PagePositionImpl();
        }
 
-       /* (non-Javadoc)
-        * @see java.lang.Object#toString()
-        */
       @Override
-          public String toString()
-       {
-           return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
-       }
+      public String toString()
+      {
+         return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
+      }
 
        public long queueID;
 
@@ -3520,57 +3466,50 @@
 
    private class LargeMessageTXFailureCallback implements TransactionFailureCallback
    {
-       private final Map<Long, ServerMessage> messages;
+      private final Map<Long, ServerMessage> messages;
 
-       public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages)
-       {
-           super();
-           this.messages = messages;
-       }
+      public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages)
+      {
+         super();
+         this.messages = messages;
+      }
 
-       public void failedTransaction(final long transactionID,
-                                     final List<RecordInfo> records,
-                                     final List<RecordInfo> recordsToDelete)
-       {
-           for (RecordInfo record : records)
-               {
-                   if (record.userRecordType == JournalStorageManager.ADD_LARGE_MESSAGE)
-                       {
-                           byte[] data = record.data;
+      public void failedTransaction(final long transactionID, final List<RecordInfo> records,
+         final List<RecordInfo> recordsToDelete)
+      {
+         for (RecordInfo record : records)
+         {
+            if (record.userRecordType == JournalStorageManager.ADD_LARGE_MESSAGE)
+            {
+               byte[] data = record.data;
 
-                           HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+               HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
 
                try
-                   {
-                       LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
-                       serverMessage.decrementDelayDeletionCount();
-                   }
+               {
+                  LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
+                  serverMessage.decrementDelayDeletionCount();
+               }
                catch (Exception e)
-                   {
-                       JournalStorageManager.log.warn(e.getMessage(), e);
-                   }
-                       }
+               {
+                  JournalStorageManager.log.warn(e.getMessage(), e);
                }
-       }
+            }
+         }
+      }
+   }
 
+   private static String describeRecord(RecordInfo info)
+   {
+      return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" +
+               newObjectEncoding(info);
    }
 
-    private static String describeRecord(RecordInfo info)
-    {
-      return "recordID=" + info.id +
-             ";userRecordType=" +
-             info.userRecordType +
-             ";isUpdate=" +
-             info.isUpdate +
-             ";" +
-          newObjectEncoding(info);
-    }
+   private static String describeRecord(RecordInfo info, Object o)
+   {
+      return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
+   }
 
-    private static String describeRecord(RecordInfo info, Object o)
-    {
-        return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
-    }
-
     // Encoding functions for binding Journal
 
     public static Object newObjectEncoding(RecordInfo info)
@@ -3861,220 +3800,216 @@
         return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
     }
 
-    /**
-     * @param fileFactory
-     * @param journal
-     * @throws Exception
-     */
+   /**
+    * @param fileFactory
+    * @param journal
+    * @throws Exception
+    */
    private static void describeJournal(SequentialFileFactory fileFactory, JournalImpl journal) throws Exception
-    {
-        List<JournalFile> files = journal.orderFiles();
+   {
+      List<JournalFile> files = journal.orderFiles();
 
-        final PrintStream out = System.out;
+      final PrintStream out = System.out;
 
-        for (JournalFile file : files)
-            {
-                out.println("#" + file);
+      for (JournalFile file : files)
+      {
+         out.println("#" + file);
 
-                JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
-                    {
+         JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+         {
 
-                        public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
-                        {
-                            out.println("operation at UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo));
-                        }
+            public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+            {
+               out.println("operation at UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo));
+            }
 
-                        public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
-                        {
-                            out.println("operation at Update;" + describeRecord(recordInfo));
-                        }
+            public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+            {
+               out.println("operation at Update;" + describeRecord(recordInfo));
+            }
 
-                        public void onReadRollbackRecord(final long transactionID) throws Exception
-                        {
-                            out.println("operation at Rollback;txID=" + transactionID);
-                        }
+            public void onReadRollbackRecord(final long transactionID) throws Exception
+            {
+               out.println("operation at Rollback;txID=" + transactionID);
+            }
 
-                        public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
-                        {
-                            out.println("operation at Prepare,txID=" + transactionID +
-                           ",numberOfRecords=" +
-                           numberOfRecords +
-                           ",extraData=" +
-                                        encode(extraData));
-                        }
+            public void
+               onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords)
+                  throws Exception
+            {
+               out.println("operation at Prepare,txID=" + transactionID + ",numberOfRecords=" + numberOfRecords +
+                        ",extraData=" + encode(extraData));
+            }
 
-                        public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
-                        {
-                            out.println("operation at DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
-                        }
+            public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+            {
+               out.println("operation at DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
+            }
 
-                        public void onReadDeleteRecord(final long recordID) throws Exception
-                        {
-                            out.println("operation at DeleteRecord;recordID=" + recordID);
-                        }
+            public void onReadDeleteRecord(final long recordID) throws Exception
+            {
+               out.println("operation at DeleteRecord;recordID=" + recordID);
+            }
 
-                        public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
-                        {
-                            out.println("operation at Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords);
-                        }
+            public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+            {
+               out.println("operation at Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords);
+            }
 
-                        public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
-                        {
-                            out.println("operation at AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
-                        }
+            public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+            {
+               out.println("operation at AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
+            }
 
-                        public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
-                        {
-                            out.println("operation at AddRecord;" + describeRecord(recordInfo));
-                        }
+            public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
+            {
+               out.println("operation at AddRecord;" + describeRecord(recordInfo));
+            }
 
-                        public void markAsDataFile(final JournalFile file)
-                        {
-                        }
-                    });
+            public void markAsDataFile(final JournalFile file)
+            {
             }
+         });
+      }
 
-        out.println();
+      out.println();
 
-        out.println("### Surviving Records Summary ###");
+      out.println("### Surviving Records Summary ###");
 
-        List<RecordInfo> records = new LinkedList<RecordInfo>();
-        List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+      List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
 
-        journal.start();
+      journal.start();
 
-        final StringBuffer bufferFailingTransactions = new StringBuffer();
+      final StringBuffer bufferFailingTransactions = new StringBuffer();
 
-        int messageCount = 0;
-        Map<Long, Integer> messageRefCounts = new HashMap<Long, Integer>();
-        int preparedMessageCount = 0;
-        Map<Long, Integer> preparedMessageRefCount = new HashMap<Long, Integer>();
-        journal.load(records, preparedTransactions, new TransactionFailureCallback()
-            {
+      int messageCount = 0;
+      Map<Long, Integer> messageRefCounts = new HashMap<Long, Integer>();
+      int preparedMessageCount = 0;
+      Map<Long, Integer> preparedMessageRefCount = new HashMap<Long, Integer>();
+      journal.load(records, preparedTransactions, new TransactionFailureCallback()
+      {
 
-                public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
-                {
-                    bufferFailingTransactions.append("Transaction " + transactionID + " failed with these records:\n");
-                    for (RecordInfo info : records)
-                        {
-                            bufferFailingTransactions.append("- " + describeRecord(info) + "\n");
-                        }
-
-                    for (RecordInfo info : recordsToDelete)
-                        {
-                            bufferFailingTransactions.append("- " + describeRecord(info) + " <marked to delete>\n");
-                        }
-
-                }
-            }, false);
-
-        for (RecordInfo info : records)
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+            bufferFailingTransactions.append("Transaction " + transactionID + " failed with these records:\n");
+            for (RecordInfo info : records)
             {
-                Object o = newObjectEncoding(info);
-                if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
-                    {
-                        messageCount++;
-                    }
-                else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
-                    {
-                        ReferenceDescribe ref = (ReferenceDescribe)o;
-                        Integer count = messageRefCounts.get(ref.refEncoding.queueID);
-                        if (count == null)
-                            {
-                                count = 1;
-                                messageRefCounts.put(ref.refEncoding.queueID, count);
-                            }
-                        else
-                            {
-                                messageRefCounts.put(ref.refEncoding.queueID, count + 1);
-                            }
-                    }
-                else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
-                    {
-                        AckDescribe ref = (AckDescribe)o;
-                        Integer count = messageRefCounts.get(ref.refEncoding.queueID);
-                        if (count == null)
-                            {
-                                messageRefCounts.put(ref.refEncoding.queueID, 0);
-                            }
-                        else
-                            {
-                                messageRefCounts.put(ref.refEncoding.queueID, count - 1);
-                            }
-                    }
-                out.println(describeRecord(info, o));
+               bufferFailingTransactions.append("- " + describeRecord(info) + "\n");
             }
 
-        out.println();
-        out.println("### Prepared TX ###");
-
-        for (PreparedTransactionInfo tx : preparedTransactions)
+            for (RecordInfo info : recordsToDelete)
             {
-                System.out.println(tx.id);
-                for (RecordInfo info : tx.records)
-                    {
-                        Object o = newObjectEncoding(info);
-                        out.println("- " + describeRecord(info, o));
-                        if (info.getUserRecordType() == 31)
-                            {
-                                preparedMessageCount++;
-                            }
-                        else if (info.getUserRecordType() == 32)
-                            {
-                                ReferenceDescribe ref = (ReferenceDescribe)o;
-                                Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
-                                if (count == null)
-                                    {
-                                        count = 1;
-                                        preparedMessageRefCount.put(ref.refEncoding.queueID, count);
-                                    }
-                                else
-                                    {
-                                        preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
-                                    }
-                            }
-                    }
-
-                for (RecordInfo info : tx.recordsToDelete)
-                    {
-                        out.println("- " + describeRecord(info) + " <marked to delete>");
-                    }
+               bufferFailingTransactions.append("- " + describeRecord(info) + " <marked to delete>\n");
             }
 
-        String missingTX = bufferFailingTransactions.toString();
+         }
+      }, false);
 
-        if (missingTX.length() > 0)
+      for (RecordInfo info : records)
+      {
+         Object o = newObjectEncoding(info);
+         if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
+         {
+            messageCount++;
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+         {
+            ReferenceDescribe ref = (ReferenceDescribe)o;
+            Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+            if (count == null)
             {
-                out.println();
-                out.println("### Failed Transactions (Missing commit/prepare/rollback record) ###");
+               count = 1;
+               messageRefCounts.put(ref.refEncoding.queueID, count);
             }
-
-        out.println(bufferFailingTransactions.toString());
-
-        out.println("### Message Counts ###");
-        out.println("message count=" + messageCount);
-        out.println("message reference count");
-        for (Map.Entry<Long, Integer> longIntegerEntry : messageRefCounts.entrySet())
+            else
             {
-                System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+               messageRefCounts.put(ref.refEncoding.queueID, count + 1);
             }
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+         {
+            AckDescribe ref = (AckDescribe)o;
+            Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+            if (count == null)
+            {
+               messageRefCounts.put(ref.refEncoding.queueID, 0);
+            }
+            else
+            {
+               messageRefCounts.put(ref.refEncoding.queueID, count - 1);
+            }
+         }
+         out.println(describeRecord(info, o));
+      }
 
-        out.println("prepared message count=" + preparedMessageCount);
+      out.println();
+      out.println("### Prepared TX ###");
 
-        for (Map.Entry<Long, Integer> longIntegerEntry : preparedMessageRefCount.entrySet())
+      for (PreparedTransactionInfo tx : preparedTransactions)
+      {
+         System.out.println(tx.id);
+         for (RecordInfo info : tx.records)
+         {
+            Object o = newObjectEncoding(info);
+            out.println("- " + describeRecord(info, o));
+            if (info.getUserRecordType() == 31)
             {
-                System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+               preparedMessageCount++;
             }
+            else if (info.getUserRecordType() == 32)
+            {
+               ReferenceDescribe ref = (ReferenceDescribe)o;
+               Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
+               if (count == null)
+               {
+                  count = 1;
+                  preparedMessageRefCount.put(ref.refEncoding.queueID, count);
+               }
+               else
+               {
+                  preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
+               }
+            }
+         }
 
-        journal.stop();
-    }
+         for (RecordInfo info : tx.recordsToDelete)
+         {
+            out.println("- " + describeRecord(info) + " <marked to delete>");
+         }
+      }
 
+      String missingTX = bufferFailingTransactions.toString();
+
+      if (missingTX.length() > 0)
+      {
+         out.println();
+         out.println("### Failed Transactions (Missing commit/prepare/rollback record) ###");
+      }
+
+      out.println(bufferFailingTransactions.toString());
+
+      out.println("### Message Counts ###");
+      out.println("message count=" + messageCount);
+      out.println("message reference count");
+      for (Map.Entry<Long, Integer> longIntegerEntry : messageRefCounts.entrySet())
+      {
+         System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+      }
+
+      out.println("prepared message count=" + preparedMessageCount);
+
+      for (Map.Entry<Long, Integer> longIntegerEntry : preparedMessageRefCount.entrySet())
+      {
+         System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+      }
+
+      journal.stop();
+   }
+
    @Override
-   public boolean addToPage(PagingManager pagingManager,
-      SimpleString address,
-      ServerMessage message,
-      RoutingContext ctx,
-      RouteContextList listCtx) throws Exception
+   public boolean addToPage(PagingManager pagingManager, SimpleString address, ServerMessage message,
+      RoutingContext ctx, RouteContextList listCtx) throws Exception
    {
       readLock();
       try
@@ -4088,16 +4023,17 @@
       }
    }
 
-    private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
-    {
-        TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
-        if (txoper == null)
-            {
-                txoper = new TXLargeMessageConfirmationOperation();
-                tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
-            }
-        txoper.confirmedMessages.add(recordID);
-    }
+   private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
+   {
+      TXLargeMessageConfirmationOperation txoper =
+               (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+      if (txoper == null)
+      {
+         txoper = new TXLargeMessageConfirmationOperation();
+         tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
+      }
+      txoper.confirmedMessages.add(recordID);
+   }
 
    class TXLargeMessageConfirmationOperation implements TransactionOperation
    {



More information about the hornetq-commits mailing list