Author: borges
Date: 2011-10-05 12:49:18 -0400 (Wed, 05 Oct 2011)
New Revision: 11476
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
HORNETQ-720 Add a compactLock to JournalImpl
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -365,39 +365,34 @@
final Journal localMessageJournal = messageJournal;
final Journal localBindingsJournal = bindingsJournal;
- final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
- final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
Map<String, Long> largeMessageFilesToSync;
Map<SimpleString, Collection<Integer>> pageFilesToSync;
+ storageManagerLock.writeLock().lock();
try
{
- storageManagerLock.writeLock().lock();
+ replicator = replicationManager;
+ localMessageJournal.synchronizationLock();
+ localBindingsJournal.synchronizationLock();
try
{
- replicator = replicationManager;
-
- localMessageJournal.writeLock();
- localBindingsJournal.writeLock();
+ pagingManager.lockAll();
try
{
- pagingManager.lockAll();
- try
- {
- messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
- pageFilesToSync = getPageInformationForSync(pagingManager);
- largeMessageFilesToSync = getLargeMessageInformation();
- }
- finally
- {
- pagingManager.unlockAll();
- }
+ messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ pageFilesToSync = getPageInformationForSync(pagingManager);
+ largeMessageFilesToSync = getLargeMessageInformation();
}
finally
{
- localMessageJournal.writeUnlock();
- localBindingsJournal.writeUnlock();
+ pagingManager.unlockAll();
}
+ }
+ finally
+ {
+ localMessageJournal.synchronizationUnlock();
+ localBindingsJournal.synchronizationUnlock();
+ }
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal,
replicator);
}
@@ -421,14 +416,9 @@
{
storageManagerLock.writeLock().unlock();
}
- }
- finally
- {
- localMessageJournal.setAutoReclaim(messageJournalAutoReclaim);
- localBindingsJournal.setAutoReclaim(bindingsJournalAutoReclaim);
- }
}
+
/**
* @param pageFilesToSync
* @throws Exception
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -587,30 +587,18 @@
}
@Override
- public boolean getAutoReclaim()
+ public void synchronizationLock()
{
throw new UnsupportedOperationException();
}
@Override
- public void writeLock()
+ public void synchronizationUnlock()
{
throw new UnsupportedOperationException();
}
@Override
- public void writeUnlock()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setAutoReclaim(boolean autoReclaim)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void forceMoveNextFile()
{
throw new UnsupportedOperationException();
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -386,7 +386,7 @@
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
- journal.writeLock();
+ journal.synchronizationLock();
try
{
if (journal.getDataFiles().length != 0)
@@ -401,7 +401,7 @@
}
finally
{
- journal.writeUnlock();
+ journal.synchronizationUnlock();
}
}
synchronized (largeMessagesOnSync)
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -156,28 +156,18 @@
Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws
Exception;
/**
- * @return whether automatic reclaiming of Journal files is enabled
+ * Write lock the Journal and write lock the compacting process. Necessary only
during
+ * replication for backup synchronization.
*/
- boolean getAutoReclaim();
+ void synchronizationLock();
/**
- * Write lock the Journal. Necessary only during replication for backup
synchronization.
+ * Unlock the Journal and the compacting process.
+ * @see Journal#synchronizationLock()
*/
- void writeLock();
+ void synchronizationUnlock();
/**
- * Write-unlock the Journal.
- * @see Journal#writeLock()
- */
- void writeUnlock();
-
- /**
- * Sets whether the journal should auto-reclaim its internal files.
- * @param autoReclaim
- */
- void setAutoReclaim(boolean autoReclaim);
-
- /**
* Force the usage of a new {@link JournalFile}.
* @throws Exception
*/
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -52,4 +52,15 @@
/** This method is called automatically when a new file is opened.
* @return true if it needs to re-check due to cleanup or other factors */
boolean checkReclaimStatus() throws Exception;
+
+ /**
+ * @return whether automatic reclaiming of Journal files is enabled
+ */
+ boolean getAutoReclaim();
+
+ /**
+ * Sets whether the journal should auto-reclaim its internal files.
+ * @param autoReclaim
+ */
+ void setAutoReclaim(boolean autoReclaim);
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -277,30 +277,18 @@
}
@Override
- public boolean getAutoReclaim()
+ public void synchronizationLock()
{
throw new UnsupportedOperationException();
}
@Override
- public void writeLock()
+ public void synchronizationUnlock()
{
throw new UnsupportedOperationException();
}
@Override
- public void writeUnlock()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setAutoReclaim(boolean autoReclaim)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void forceMoveNextFile()
{
throw new UnsupportedOperationException();
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -206,6 +206,7 @@
* However we need to lock it while taking and updating snapshots
*/
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+ private final ReadWriteLock compactorLock = new ReentrantReadWriteLock();
private volatile JournalState state = JournalState.STOPPED;
@@ -1427,6 +1428,9 @@
throw new IllegalStateException("There is pending compacting
operation");
}
+ compactorLock.writeLock().lock();
+ try
+ {
ArrayList<JournalFile> dataFilesToProcess = new
ArrayList<JournalFile>(filesRepository.getDataFilesCount());
boolean previousReclaimValue = autoReclaim;
@@ -1455,7 +1459,7 @@
return;
}
- onCompactLock();
+ onCompactLockingTheJournal();
setAutoReclaim(false);
@@ -1534,7 +1538,7 @@
// Need to clear the compactor here, or the replay commands will send
commands back (infinite loop)
compactor = null;
- onCompactLock();
+ onCompactLockingTheJournal();
newDatafiles = localCompactor.getNewDataFiles();
@@ -1626,8 +1630,12 @@
compactor = null;
}
setAutoReclaim(previousReclaimValue);
+ }
}
-
+ finally
+ {
+ compactorLock.writeLock().unlock();
+ }
}
/**
@@ -2145,11 +2153,18 @@
// TestableJournal implementation
// --------------------------------------------------------------
+ @Override
public synchronized void setAutoReclaim(final boolean autoReclaim)
{
this.autoReclaim = autoReclaim;
}
+ @Override
+ public boolean getAutoReclaim()
+ {
+ return autoReclaim;
+ }
+
public String debug() throws Exception
{
reclaimer.scan(getDataFiles());
@@ -2493,7 +2508,7 @@
/** This is an interception point for testcases, when the compacted files are written,
to be called
* as soon as the compactor gets a writeLock */
- protected void onCompactLock() throws Exception
+ protected void onCompactLockingTheJournal() throws Exception
{
}
@@ -2973,14 +2988,22 @@
}
}
- public void writeLock()
+ public void synchronizationLock()
{
+ compactorLock.writeLock().lock();
journalLock.writeLock().lock();
}
- public void writeUnlock()
+ public void synchronizationUnlock()
{
- journalLock.writeLock().unlock();
+ try
+ {
+ compactorLock.writeLock().unlock();
+ }
+ finally
+ {
+ journalLock.writeLock().unlock();
+ }
}
/**
@@ -2991,7 +3014,7 @@
@Override
public synchronized Map<Long, JournalFile> createFilesForBackupSync(long[]
fileIds) throws Exception
{
- writeLock();
+ synchronizationLock();
try
{
Map<Long, JournalFile> map = new HashMap<Long, JournalFile>();
@@ -3007,15 +3030,10 @@
}
finally
{
- writeUnlock();
+ synchronizationUnlock();
}
}
- public boolean getAutoReclaim()
- {
- return autoReclaim;
- }
-
@Override
public SequentialFileFactory getFileFactory()
{
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -861,30 +861,18 @@
}
@Override
- public boolean getAutoReclaim()
+ public void synchronizationLock()
{
- return false;
- }
- @Override
- public void writeLock()
- {
-
}
@Override
- public void writeUnlock()
+ public void synchronizationUnlock()
{
}
@Override
- public void setAutoReclaim(boolean autoReclaim)
- {
-
- }
-
- @Override
public void forceMoveNextFile() throws Exception
{
Modified:
branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -117,7 +117,7 @@
"hq",
maxAIO)
{
- protected void onCompactLock() throws Exception
+ protected void onCompactLockingTheJournal() throws Exception
{
}
Modified:
branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2011-10-05
16:46:27 UTC (rev 11475)
+++
branches/HORNETQ-720_Replication/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2011-10-05
16:49:18 UTC (rev 11476)
@@ -132,7 +132,7 @@
"hq",
maxAIO)
{
- protected void onCompactLock() throws Exception
+ protected void onCompactLockingTheJournal() throws Exception
{
}