Author: borges
Date: 2011-08-09 06:32:37 -0400 (Tue, 09 Aug 2011)
New Revision: 11163
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 More locking control over JournalStorageManager (unfinished)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09
10:31:56 UTC (rev 11162)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09
10:32:37 UTC (rev 11163)
@@ -600,6 +600,8 @@
}
readLock();
+ try
+ {
// Note that we don't sync, the add reference that comes immediately after will
sync if appropriate
if (message.isLargeMessage())
@@ -618,19 +620,29 @@
false,
getContext(false));
}
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeReference(final long queueID, final long messageID, final boolean
last) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID),
last && syncNonTransactional,
getContext(last &&
syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
private void readLock()
@@ -646,17 +658,25 @@
public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID),
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeCursorAcknowledge(long queueID, PagePosition position) throws
Exception
{
readLock();
+ try
+ {
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecord(ackID,
@@ -664,52 +684,78 @@
new CursorAckRecordEncoding(queueID, position),
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteMessage(final long messageID) throws Exception
{
readLock();
+ try
+ {
// Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on the Executor
// increasing chances of losing deletes.
// The StorageManager should verify messages without references
messageJournal.appendDeleteRecord(messageID, false, getContext(false));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
- readLock();
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
-
+ readLock();
+ try
+ {
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final
long recordID) throws Exception
{
+ DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
readLock();
- DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
-
- messageJournal.appendAddRecord(recordID,
+ try
+ {
+ messageJournal.appendAddRecord(recordID,
JournalStorageManager.DUPLICATE_ID,
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteDuplicateID(final long recordID) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Transactional operations
@@ -717,6 +763,8 @@
public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
{
readLock();
+ try
+ {
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
@@ -736,61 +784,95 @@
JournalStorageManager.ADD_MESSAGE,
message);
}
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storePageTransaction(final long txID, final PageTransactionInfo
pageTransaction) throws Exception
{
readLock();
+ try
+ {
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
pageTransaction);
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updatePageTransaction(final long txID, final PageTransactionInfo
pageTransaction, final int depages) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
depages));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int
depages) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new
PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeReferenceTransactional(final long txID, final long queueID, final
long messageID) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final
long messageID) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -798,12 +880,20 @@
*/
public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition
position) throws Exception
{
- long ackID = idGenerator.generateID();
- position.setRecordID(ackID);
- messageJournal.appendAddRecordTransactional(txID,
+ readLock();
+ try
+ {
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecordTransactional(txID,
ackID,
ACKNOWLEDGE_CURSOR,
new CursorAckRecordEncoding(queueID,
position));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -811,19 +901,34 @@
*/
public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws
Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, ackID);
+ readLock();
+ try
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, ackID);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws
Exception
{
- long id = generateUniqueID();
-
- messageJournal.appendAddRecord(id,
+ readLock();
+ try
+ {
+ long id = generateUniqueID();
+ messageJournal.appendAddRecord(id,
JournalStorageManager.HEURISTIC_COMPLETION,
new HeuristicCompletionEncoding(xid, isCommit),
true,
getContext(true));
- return id;
+ return id;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteHeuristicCompletion(final long id) throws Exception
@@ -1430,12 +1535,28 @@
GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
groupBinding.getGroupId(),
groupBinding.getClusterName());
+ readLock();
+ try
+ {
bindingsJournal.appendAddRecord(groupBinding.getId(),
JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteGrouping(final GroupBinding groupBinding) throws Exception
{
+ readLock();
+ try
+ {
bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Bindings operations
@@ -1451,16 +1572,31 @@
PersistentQueueBindingEncoding bindingEncoding = new
PersistentQueueBindingEncoding(queue.getName(),
binding.getAddress(),
filterString);
-
+ readLock();
+ try
+ {
bindingsJournal.appendAddRecord(binding.getID(),
JournalStorageManager.QUEUE_BINDING_RECORD,
bindingEncoding,
true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteQueueBinding(final long queueBindingID) throws Exception
{
+ readLock();
+ try
+ {
bindingsJournal.appendDeleteRecord(queueBindingID, true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -1468,12 +1604,20 @@
*/
public long storePageCounterInc(long txID, long queueID, int value) throws Exception
{
+ readLock();
+ try
+ {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
new PageCountRecordInc(queueID,
value));
- return recordID;
+ return recordID;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -1481,6 +1625,9 @@
*/
public long storePageCounterInc(long queueID, int value) throws Exception
{
+ readLock();
+ try
+ {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecord(recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
@@ -1488,6 +1635,11 @@
true,
getContext());
return recordID;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -1495,6 +1647,9 @@
*/
public long storePageCounter(long txID, long queueID, long value) throws Exception
{
+ readLock();
+ try
+ {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
recordID,
@@ -1502,22 +1657,43 @@
new PageCountRecord(queueID, value));
return recordID;
}
+ finally
+ {
+ readUnLock();
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
*/
public void deleteIncrementRecord(long txID, long recordID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
+ finally
+ {
+ readUnLock();
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
*/
public void deletePageCounter(long txID, long recordID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
+ finally
+ {
+ readUnLock();
+ }
+ }
public static void describeBindingJournal(final String bindingsDir) throws Exception
{
@@ -1606,7 +1782,15 @@
*/
public void lineUpContext()
{
+ readLock();
+ try
+ {
messageJournal.lineUpContex(getContext());
+ }
+ finally
+ {
+ readUnLock();
+ }
}
@@ -1673,10 +1857,18 @@
*/
public JournalLoadInformation[] loadInternalOnly() throws Exception
{
- JournalLoadInformation[] info = new JournalLoadInformation[2];
+ readLock();
+ try
+ {
+ JournalLoadInformation[] info = new JournalLoadInformation[2];
info[0] = bindingsJournal.loadInternalOnly();
info[1] = messageJournal.loadInternalOnly();
return info;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Public
-----------------------------------------------------------------------------------