Author: borges
Date: 2011-08-09 09:03:23 -0400 (Tue, 09 Aug 2011)
New Revision: 11165
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 Add locking to methods that still missed it.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09
13:02:46 UTC (rev 11164)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09
13:03:23 UTC (rev 11165)
@@ -762,29 +762,29 @@
public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
{
- readLock();
- try
- {
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
}
- if (message.isLargeMessage())
+ readLock();
+ try
{
- messageJournal.appendAddRecordTransactional(txID,
+ if (message.isLargeMessage())
+ {
+ messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
JournalStorageManager.ADD_LARGE_MESSAGE,
new
LargeMessageEncoding(((LargeServerMessage)message)));
- }
- else
- {
- messageJournal.appendAddRecordTransactional(txID,
+ }
+ else
+ {
+ messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
JournalStorageManager.ADD_MESSAGE,
message);
+ }
}
- }
finally
{
readUnLock();
@@ -933,33 +933,74 @@
public void deleteHeuristicCompletion(final long id) throws Exception
{
+ readLock();
+ try
+ {
+
messageJournal.appendDeleteRecord(id, true, getContext(true));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deletePageTransactional(final long recordID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecord(recordID, false);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updateScheduledDeliveryTimeTransactional(final long txID, final
MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
+ readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
encoding);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteMessageTransactional(final long txID, final long queueID, final long
messageID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecordTransactional(txID, messageID, new
DeleteEncoding(queueID));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void prepare(final long txID, final Xid xid) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional,
getContext(syncTransactional));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void commit(final long txID) throws Exception
@@ -969,17 +1010,33 @@
public void commit(final long txID, final boolean lineUpContext) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendCommitRecord(txID, syncTransactional,
getContext(syncTransactional), lineUpContext);
if (!lineUpContext && !syncTransactional)
{
// if lineUpContext == false, we have previously lined up a context, hence we
need to mark it as done even if syncTransactional = false
getContext(true).done();
+ }
}
+ finally
+ {
+ readUnLock();
+ }
}
public void rollback(final long txID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendRollbackRecord(txID, syncTransactional,
getContext(syncTransactional));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeDuplicateIDTransactional(final long txID,
@@ -989,7 +1046,15 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
+ readLock();
+ try
+ {
messageJournal.appendAddRecordTransactional(txID, recordID,
JournalStorageManager.DUPLICATE_ID, encoding);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updateDuplicateIDTransactional(final long txID,
@@ -999,12 +1064,28 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
+ readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID, recordID,
JournalStorageManager.DUPLICATE_ID, encoding);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteDuplicateIDTransactional(final long txID, final long recordID)
throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ readLock();
+ try
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Other operations
@@ -1019,7 +1100,10 @@
DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ readLock();
+ try
+ {
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.UPDATE_DELIVERY_COUNT,
updateInfo,
@@ -1027,15 +1111,28 @@
getContext(syncNonTransactional));
}
+ finally
+ {
+ readUnLock();
+ }
+ }
}
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws
Exception
{
deleteAddressSetting(addressSetting.getAddressMatch());
+ readLock();
+ try
+ {
long id = idGenerator.generateID();
addressSetting.setStoreId(id);
bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
@@ -1062,10 +1159,18 @@
{
deleteSecurityRoles(persistedRoles.getAddressMatch());
- long id = idGenerator.generateID();
+ readLock();
+ try
+ {
+ final long id = idGenerator.generateID();
persistedRoles.setStoreId(id);
bindingsJournal.appendAddRecord(id, SECURITY_RECORD, persistedRoles, true);
mapPersistedRoles.put(persistedRoles.getAddressMatch(), persistedRoles);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteAddressSetting(SimpleString addressMatch) throws Exception
@@ -1073,7 +1178,15 @@
PersistedAddressSetting oldSetting =
mapPersistedAddressSettings.remove(addressMatch);
if (oldSetting != null)
{
- bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+ readLock();
+ try
+ {
+ bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
}
@@ -1083,7 +1196,15 @@
PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
if (oldRoles != null)
{
- bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+ readLock();
+ try
+ {
+ bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
}
@@ -1099,6 +1220,9 @@
List<PreparedTransactionInfo> preparedTransactions = new
ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long,
ServerMessage>();
+ readLock();
+ try
+ {
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
@@ -1498,6 +1622,11 @@
}
journalLoaded = true;
return info;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/**
@@ -1506,7 +1635,7 @@
* @param queueInfos
* @return
*/
- private PageSubscription locateSubscription(final long queueID,
+ private static PageSubscription locateSubscription(final long queueID,
final Map<Long, PageSubscription>
pageSubscriptions,
final Map<Long, QueueBindingInfo>
queueInfos,
final PagingManager pagingManager) throws
Exception
@@ -1538,7 +1667,8 @@
readLock();
try
{
- bindingsJournal.appendAddRecord(groupBinding.getId(),
JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
+ bindingsJournal.appendAddRecord(groupBinding.getId(),
JournalStorageManager.GROUP_RECORD, groupingEncoding,
+ true);
}
finally
{
@@ -1551,7 +1681,7 @@
readLock();
try
{
- bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+ bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
}
finally
{
@@ -1628,13 +1758,13 @@
readLock();
try
{
- long recordID = idGenerator.generateID();
- messageJournal.appendAddRecord(recordID,
+ final long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecord(recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
new PageCountRecordInc(queueID, value),
true,
getContext());
- return recordID;
+ return recordID;
}
finally
{
@@ -1650,7 +1780,7 @@
readLock();
try
{
- long recordID = idGenerator.generateID();
+ final long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
@@ -1671,8 +1801,8 @@
readLock();
try
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
- }
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
finally
{
readUnLock();
@@ -1687,8 +1817,8 @@
readLock();
try
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
- }
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
finally
{
readUnLock();
@@ -1785,7 +1915,7 @@
readLock();
try
{
- messageJournal.lineUpContex(getContext());
+ messageJournal.lineUpContex(getContext());
}
finally
{
@@ -1861,9 +1991,9 @@
try
{
JournalLoadInformation[] info = new JournalLoadInformation[2];
- info[0] = bindingsJournal.loadInternalOnly();
- info[1] = messageJournal.loadInternalOnly();
- return info;
+ info[0] = bindingsJournal.loadInternalOnly();
+ info[1] = messageJournal.loadInternalOnly();
+ return info;
}
finally
{