[hornetq-commits] JBoss hornetq SVN: r11165 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 9 09:03:23 EDT 2011


Author: borges
Date: 2011-08-09 09:03:23 -0400 (Tue, 09 Aug 2011)
New Revision: 11165

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 Add locking to methods that still missed it.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-09 13:02:46 UTC (rev 11164)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-09 13:03:23 UTC (rev 11165)
@@ -762,29 +762,29 @@
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
    {
-      readLock();
-      try
-      {
       if (message.getMessageID() <= 0)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
       }
 
-      if (message.isLargeMessage())
+      readLock();
+      try
       {
-         messageJournal.appendAddRecordTransactional(txID,
+         if (message.isLargeMessage())
+         {
+            messageJournal.appendAddRecordTransactional(txID,
                                                      message.getMessageID(),
                                                      JournalStorageManager.ADD_LARGE_MESSAGE,
                                                      new LargeMessageEncoding(((LargeServerMessage)message)));
-      }
-      else
-      {
-         messageJournal.appendAddRecordTransactional(txID,
+         }
+         else
+         {
+            messageJournal.appendAddRecordTransactional(txID,
                                                      message.getMessageID(),
                                                      JournalStorageManager.ADD_MESSAGE,
                                                      message);
+         }
       }
-      }
       finally
       {
          readUnLock();
@@ -933,33 +933,74 @@
 
    public void deleteHeuristicCompletion(final long id) throws Exception
    {
+      readLock();
+      try
+      {
+
       messageJournal.appendDeleteRecord(id, true, getContext(true));
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deletePageTransactional(final long recordID) throws Exception
    {
+      readLock();
+      try
+      {
       messageJournal.appendDeleteRecord(recordID, false);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
    {
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
                                                                                                             .getID());
+      readLock();
+      try
+      {
 
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      ref.getMessage().getMessageID(),
                                                      JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
                                                      encoding);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
+      readLock();
+      try
+      {
       messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void prepare(final long txID, final Xid xid) throws Exception
    {
+      readLock();
+      try
+      {
       messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional));
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void commit(final long txID) throws Exception
@@ -969,17 +1010,33 @@
 
    public void commit(final long txID, final boolean lineUpContext) throws Exception
    {
+      readLock();
+      try
+      {
       messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
       if (!lineUpContext && !syncTransactional)
       {
          // if lineUpContext == false, we have previously lined up a context, hence we need to mark it as done even if syncTransactional = false
          getContext(true).done();
+         }
       }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void rollback(final long txID) throws Exception
    {
+      readLock();
+      try
+      {
       messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional));
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void storeDuplicateIDTransactional(final long txID,
@@ -989,7 +1046,15 @@
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
+      readLock();
+      try
+      {
       messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.DUPLICATE_ID, encoding);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void updateDuplicateIDTransactional(final long txID,
@@ -999,12 +1064,28 @@
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
+      readLock();
+      try
+      {
       messageJournal.appendUpdateRecordTransactional(txID, recordID, JournalStorageManager.DUPLICATE_ID, encoding);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      readLock();
+      try
+      {
+         messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    // Other operations
@@ -1019,7 +1100,10 @@
          DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
                                                                                   ref.getDeliveryCount());
 
-         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+         readLock();
+         try
+         {
+            messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                            JournalStorageManager.UPDATE_DELIVERY_COUNT,
                                            updateInfo,
 
@@ -1027,15 +1111,28 @@
                                            getContext(syncNonTransactional));
       }
 
+         finally
+         {
+            readUnLock();
+         }
+      }
    }
 
    public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
    {
       deleteAddressSetting(addressSetting.getAddressMatch());
+      readLock();
+      try
+      {
       long id = idGenerator.generateID();
       addressSetting.setStoreId(id);
       bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
       mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
@@ -1062,10 +1159,18 @@
    {
 
       deleteSecurityRoles(persistedRoles.getAddressMatch());
-      long id = idGenerator.generateID();
+      readLock();
+      try
+      {
+         final long id = idGenerator.generateID();
       persistedRoles.setStoreId(id);
       bindingsJournal.appendAddRecord(id, SECURITY_RECORD, persistedRoles, true);
       mapPersistedRoles.put(persistedRoles.getAddressMatch(), persistedRoles);
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    public void deleteAddressSetting(SimpleString addressMatch) throws Exception
@@ -1073,7 +1178,15 @@
       PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
       if (oldSetting != null)
       {
-         bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+         readLock();
+         try
+         {
+            bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+         }
+         finally
+         {
+            readUnLock();
+         }
       }
 
    }
@@ -1083,7 +1196,15 @@
       PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
       if (oldRoles != null)
       {
-         bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+         readLock();
+         try
+         {
+            bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+         }
+         finally
+         {
+            readUnLock();
+         }
       }
    }
 
@@ -1099,6 +1220,9 @@
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+      readLock();
+      try
+      {
 
       JournalLoadInformation info = messageJournal.load(records,
                                                         preparedTransactions,
@@ -1498,6 +1622,11 @@
       }
       journalLoaded = true;
       return info;
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    /**
@@ -1506,7 +1635,7 @@
     * @param queueInfos
     * @return
     */
-   private PageSubscription locateSubscription(final long queueID,
+   private static PageSubscription locateSubscription(final long queueID,
                                                final Map<Long, PageSubscription> pageSubscriptions,
                                                final Map<Long, QueueBindingInfo> queueInfos,
                                                final PagingManager pagingManager) throws Exception
@@ -1538,7 +1667,8 @@
       readLock();
       try
       {
-      bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
+         bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding,
+                                         true);
       }
       finally
       {
@@ -1551,7 +1681,7 @@
       readLock();
       try
       {
-      bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+         bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
       }
       finally
       {
@@ -1628,13 +1758,13 @@
       readLock();
       try
       {
-      long recordID = idGenerator.generateID();
-      messageJournal.appendAddRecord(recordID,
+         final long recordID = idGenerator.generateID();
+         messageJournal.appendAddRecord(recordID,
                                      JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
                                      new PageCountRecordInc(queueID, value),
                                      true,
                                      getContext());
-      return recordID;
+         return recordID;
       }
       finally
       {
@@ -1650,7 +1780,7 @@
       readLock();
       try
       {
-      long recordID = idGenerator.generateID();
+         final long recordID = idGenerator.generateID();
       messageJournal.appendAddRecordTransactional(txID,
                                                   recordID,
                                                   JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
@@ -1671,8 +1801,8 @@
       readLock();
       try
       {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID);
-   }
+         messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      }
       finally
       {
          readUnLock();
@@ -1687,8 +1817,8 @@
       readLock();
       try
       {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID);
-   }
+         messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      }
       finally
       {
          readUnLock();
@@ -1785,7 +1915,7 @@
       readLock();
       try
       {
-      messageJournal.lineUpContex(getContext());
+         messageJournal.lineUpContex(getContext());
       }
       finally
       {
@@ -1861,9 +1991,9 @@
       try
       {
          JournalLoadInformation[] info = new JournalLoadInformation[2];
-      info[0] = bindingsJournal.loadInternalOnly();
-      info[1] = messageJournal.loadInternalOnly();
-      return info;
+         info[0] = bindingsJournal.loadInternalOnly();
+         info[1] = messageJournal.loadInternalOnly();
+         return info;
       }
       finally
       {



More information about the hornetq-commits mailing list