[hornetq-commits] JBoss hornetq SVN: r11163 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 9 06:32:37 EDT 2011


Author: borges
Date: 2011-08-09 06:32:37 -0400 (Tue, 09 Aug 2011)
New Revision: 11163

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 More locking control over JournalStorageManager (unfinished)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-09 10:31:56 UTC (rev 11162)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-09 10:32:37 UTC (rev 11163)
@@ -600,6 +600,8 @@
       }
 
       readLock();
+      try
+      {
       // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
 
       if (message.isLargeMessage())
@@ -618,19 +620,29 @@
                                         false,
                                         getContext(false));
       }
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
 
    {
       readLock();
+      try
+      {
       messageJournal.appendUpdateRecord(messageID,
                                         JournalStorageManager.ADD_REF,
                                         new RefEncoding(queueID),
                                         last && syncNonTransactional,
                                         getContext(last && syncNonTransactional));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    private void readLock()
@@ -646,17 +658,25 @@
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception
    {
       readLock();
+      try
+      {
       messageJournal.appendUpdateRecord(messageID,
                                         JournalStorageManager.ACKNOWLEDGE_REF,
                                         new RefEncoding(queueID),
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
    {
       readLock();
+      try
+      {
       long ackID = idGenerator.generateID();
       position.setRecordID(ackID);
       messageJournal.appendAddRecord(ackID,
@@ -664,52 +684,78 @@
                                      new CursorAckRecordEncoding(queueID, position),
                                      syncNonTransactional,
                                      getContext(syncNonTransactional));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteMessage(final long messageID) throws Exception
    {
       readLock();
+      try
+      {
       // Messages are deleted on postACK, one after another.
       // If these deletes are synchronized, we would build up messages on the Executor
       // increasing chances of losing deletes.
       // The StorageManager should verify messages without references
       messageJournal.appendDeleteRecord(messageID, false, getContext(false));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
    {
-      readLock();
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
                                                                                                             .getID());
-
+      readLock();
+      try
+      {
       messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                         JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
                                         encoding,
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
+      DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
       readLock();
-      DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
-
-      messageJournal.appendAddRecord(recordID,
+      try
+      {
+         messageJournal.appendAddRecord(recordID,
                                      JournalStorageManager.DUPLICATE_ID,
                                      encoding,
                                      syncNonTransactional,
                                      getContext(syncNonTransactional));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteDuplicateID(final long recordID) throws Exception
    {
       readLock();
+      try
+      {
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    // Transactional operations
@@ -717,6 +763,8 @@
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
    {
       readLock();
+      try
+      {
       if (message.getMessageID() <= 0)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
@@ -736,61 +784,95 @@
                                                      JournalStorageManager.ADD_MESSAGE,
                                                      message);
       }
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
    {
       readLock();
+      try
+      {
       pageTransaction.setRecordID(generateUniqueID());
 
       messageJournal.appendAddRecordTransactional(txID,
                                                   pageTransaction.getRecordID(),
                                                   JournalStorageManager.PAGE_TRANSACTION,
                                                   pageTransaction);
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
    {
       readLock();
+      try
+      {
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      pageTransaction.getRecordID(),
                                                      JournalStorageManager.PAGE_TRANSACTION,
                                                      new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
                                                                               depages));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
    {
       readLock();
+      try
+      {
       messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
                                         JournalStorageManager.PAGE_TRANSACTION,
                                         new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
       readLock();
+      try
+      {
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      JournalStorageManager.ADD_REF,
                                                      new RefEncoding(queueID));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
       readLock();
+      try
+      {
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      JournalStorageManager.ACKNOWLEDGE_REF,
                                                      new RefEncoding(queueID));
-      readUnLock();
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    /* (non-Javadoc)
@@ -798,12 +880,20 @@
     */
    public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception
    {
-      long ackID = idGenerator.generateID();
-      position.setRecordID(ackID);
-      messageJournal.appendAddRecordTransactional(txID,
+      readLock();
+      try
+      {
+         long ackID = idGenerator.generateID();
+         position.setRecordID(ackID);
+         messageJournal.appendAddRecordTransactional(txID,
                                                   ackID,
                                                   ACKNOWLEDGE_CURSOR,
                                                   new CursorAckRecordEncoding(queueID, position));
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    /* (non-Javadoc)
@@ -811,19 +901,34 @@
     */
    public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, ackID);
+      readLock();
+      try
+      {
+         messageJournal.appendDeleteRecordTransactional(txID, ackID);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
    {
-      long id = generateUniqueID();
-
-      messageJournal.appendAddRecord(id,
+      readLock();
+      try
+      {
+         long id = generateUniqueID();
+         messageJournal.appendAddRecord(id,
                                      JournalStorageManager.HEURISTIC_COMPLETION,
                                      new HeuristicCompletionEncoding(xid, isCommit),
                                      true,
                                      getContext(true));
-      return id;
+         return id;
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteHeuristicCompletion(final long id) throws Exception
@@ -1430,12 +1535,28 @@
       GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
                                                                groupBinding.getGroupId(),
                                                                groupBinding.getClusterName());
+      readLock();
+      try
+      {
       bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteGrouping(final GroupBinding groupBinding) throws Exception
    {
+      readLock();
+      try
+      {
       bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    // Bindings operations
@@ -1451,16 +1572,31 @@
       PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
                                                                                           binding.getAddress(),
                                                                                           filterString);
-
+      readLock();
+      try
+      {
       bindingsJournal.appendAddRecord(binding.getID(),
                                       JournalStorageManager.QUEUE_BINDING_RECORD,
                                       bindingEncoding,
                                       true);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteQueueBinding(final long queueBindingID) throws Exception
    {
+      readLock();
+      try
+      {
       bindingsJournal.appendDeleteRecord(queueBindingID, true);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    /* (non-Javadoc)
@@ -1468,12 +1604,20 @@
     */
    public long storePageCounterInc(long txID, long queueID, int value) throws Exception
    {
+      readLock();
+      try
+      {
       long recordID = idGenerator.generateID();
       messageJournal.appendAddRecordTransactional(txID,
                                                   recordID,
                                                   JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
                                                   new PageCountRecordInc(queueID, value));
-      return recordID;
+         return recordID;
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    /* (non-Javadoc)
@@ -1481,6 +1625,9 @@
     */
    public long storePageCounterInc(long queueID, int value) throws Exception
    {
+      readLock();
+      try
+      {
       long recordID = idGenerator.generateID();
       messageJournal.appendAddRecord(recordID,
                                      JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
@@ -1488,6 +1635,11 @@
                                      true,
                                      getContext());
       return recordID;
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    /* (non-Javadoc)
@@ -1495,6 +1647,9 @@
     */
    public long storePageCounter(long txID, long queueID, long value) throws Exception
    {
+      readLock();
+      try
+      {
       long recordID = idGenerator.generateID();
       messageJournal.appendAddRecordTransactional(txID,
                                                   recordID,
@@ -1502,22 +1657,43 @@
                                                   new PageCountRecord(queueID, value));
       return recordID;
    }
+      finally
+      {
+         readUnLock();
+      }
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
     */
    public void deleteIncrementRecord(long txID, long recordID) throws Exception
    {
+      readLock();
+      try
+      {
       messageJournal.appendDeleteRecordTransactional(txID, recordID);
    }
+      finally
+      {
+         readUnLock();
+      }
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
     */
    public void deletePageCounter(long txID, long recordID) throws Exception
    {
+      readLock();
+      try
+      {
       messageJournal.appendDeleteRecordTransactional(txID, recordID);
    }
+      finally
+      {
+         readUnLock();
+      }
+   }
 
    public static void describeBindingJournal(final String bindingsDir) throws Exception
    {
@@ -1606,7 +1782,15 @@
     */
    public void lineUpContext()
    {
+      readLock();
+      try
+      {
       messageJournal.lineUpContex(getContext());
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
 
@@ -1673,10 +1857,18 @@
     */
    public JournalLoadInformation[] loadInternalOnly() throws Exception
    {
-      JournalLoadInformation[] info = new JournalLoadInformation[2];
+      readLock();
+      try
+      {
+         JournalLoadInformation[] info = new JournalLoadInformation[2];
       info[0] = bindingsJournal.loadInternalOnly();
       info[1] = messageJournal.loadInternalOnly();
       return info;
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    // Public -----------------------------------------------------------------------------------



More information about the hornetq-commits mailing list