Author: borges
Date: 2011-08-01 09:50:45 -0400 (Mon, 01 Aug 2011)
New Revision: 11085
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 INCOMPLETE: Lock StorageManager before re-assigning journal fields.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01
12:50:41 UTC (rev 11084)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01
13:50:45 UTC (rev 11085)
@@ -32,6 +32,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.Xid;
@@ -155,6 +156,7 @@
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
private final BatchingIDGenerator idGenerator;
+ private final ReentrantReadWriteLock storageManagerLock = new
ReentrantReadWriteLock(true);
private ReplicationManager replicator;
@@ -361,31 +363,37 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"journals here are not JournalImpl. You
can't set a replicator!");
}
+ JournalFile[] messageFiles = null;
+ JournalFile[] bindingsFiles = null;
+
// XXX HORNETQ-720 WRITE LOCK the StorageManager.
+ storageManagerLock.writeLock().lock();
+ try
+ {
+ final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+ final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
- final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
- final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
- JournalFile[] messageFiles;
- JournalFile[] bindingsFiles;
-
- localMessageJournal.writeLock();
- localBindingsJournal.writeLock();
- try
- {
- messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ localMessageJournal.writeLock();
+ localBindingsJournal.writeLock();
+ try
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ }
+ finally
+ {
+ localMessageJournal.writeUnlock();
+ localBindingsJournal.writeUnlock();
+ }
+ bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessageJournal,
replicator);
}
finally
{
- localMessageJournal.writeUnlock();
- localBindingsJournal.writeUnlock();
+ // XXX HORNETQ-720 UNLOCK StorageManager...
+ storageManagerLock.writeLock().unlock();
}
- bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
- messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
-
- // XXX HORNETQ-720 UNLOCK StorageManager...
-
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
@@ -571,6 +579,7 @@
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
}
+ readLock();
// Note that we don't sync, the add reference that comes immediately after will
sync if appropriate
if (message.isLargeMessage())
@@ -589,29 +598,45 @@
false,
getContext(false));
}
+ readUnLock();
}
public void storeReference(final long queueID, final long messageID, final boolean
last) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID),
last && syncNonTransactional,
getContext(last &&
syncNonTransactional));
+ readUnLock();
}
+ private void readLock()
+ {
+ storageManagerLock.readLock().lock();
+ }
+
+ private void readUnLock()
+ {
+ storageManagerLock.readLock().unlock();
+ }
+
public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
{
+ readLock();
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID),
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void storeCursorAcknowledge(long queueID, PagePosition position) throws
Exception
{
+ readLock();
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecord(ackID,
@@ -619,19 +644,23 @@
new CursorAckRecordEncoding(queueID, position),
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void deleteMessage(final long messageID) throws Exception
{
+ readLock();
// Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on the Executor
// increasing chances of losing deletes.
// The StorageManager should verify messages without references
messageJournal.appendDeleteRecord(messageID, false, getContext(false));
+ readUnLock();
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
+ readLock();
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
@@ -640,10 +669,12 @@
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final
long recordID) throws Exception
{
+ readLock();
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
messageJournal.appendAddRecord(recordID,
@@ -651,17 +682,21 @@
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void deleteDuplicateID(final long recordID) throws Exception
{
+ readLock();
messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
{
+ readLock();
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
@@ -681,51 +716,61 @@
JournalStorageManager.ADD_MESSAGE,
message);
}
-
+ readUnLock();
}
public void storePageTransaction(final long txID, final PageTransactionInfo
pageTransaction) throws Exception
{
+ readLock();
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
pageTransaction);
+ readUnLock();
}
public void updatePageTransaction(final long txID, final PageTransactionInfo
pageTransaction, final int depages) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
depages));
+ readUnLock();
}
public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int
depages) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new
PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void storeReferenceTransactional(final long txID, final long queueID, final
long messageID) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID));
+ readUnLock();
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final
long messageID) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID));
+ readUnLock();
}
/* (non-Javadoc)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01
12:50:41 UTC (rev 11084)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01
13:50:45 UTC (rev 11085)
@@ -378,6 +378,7 @@
journal.finishRemoteBackupSync();
}
server.setRemoteBackupUpToDate();
+ log.info("Backup server " + server + " is synchronized with
live-server.");
return;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01
12:50:41 UTC (rev 11084)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01
13:50:45 UTC (rev 11085)
@@ -580,7 +580,7 @@
}
catch (Exception e)
{
- log.warn("unable to announce backup for replication",
e);
+ log.warn("Unable to announce backup for replication.",
e);
}
}
});