Author: borges
Date: 2011-07-07 06:22:25 -0400 (Thu, 07 Jul 2011)
New Revision: 10944
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Use enum for state instead of numeric codes, reduce code duplication.
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-07
10:21:52 UTC (rev 10943)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-07
10:22:25 UTC (rev 10944)
@@ -65,26 +65,25 @@
import org.hornetq.utils.DataConstants;
/**
- *
+ *
* <p>A circular log implementation.</p
- *
+ *
* <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
*/
public class JournalImpl implements TestableJournal, JournalRecordProvider
{
+ private enum JournalState
+ {
+ STOPPED, STARTED, LOADED;
+ }
// Constants -----------------------------------------------------
- private static final int STATE_STOPPED = 0;
- private static final int STATE_STARTED = 1;
-
- private static final int STATE_LOADED = 2;
-
public static final int FORMAT_VERSION = 2;
private static final int COMPATIBLE_VERSIONS[] = new int[] { 1 };
@@ -210,7 +209,7 @@
private volatile JournalFile currentFile;
- private volatile int state;
+ private volatile JournalState state = JournalState.STOPPED;
private final Reclaimer reclaimer = new Reclaimer();
@@ -280,7 +279,7 @@
}
else
{
- this.compactPercentage = (float)compactPercentage / 100f;
+ this.compactPercentage = compactPercentage / 100f;
}
this.compactMinFiles = compactMinFiles;
@@ -378,7 +377,7 @@
return compactor;
}
- /** this method is used internally only however tools may use it to maintenance.
+ /** this method is used internally only however tools may use it to maintenance.
* It won't be part of the interface as the tools should be specific to the
implementation */
public List<JournalFile> orderFiles() throws Exception
{
@@ -424,7 +423,7 @@
{
final int filesize = (int)file.getFile().size();
- wholeFileBuffer = fileFactory.newBuffer((int)filesize);
+ wholeFileBuffer = fileFactory.newBuffer(filesize);
final int journalFileSize = file.getFile().read(wholeFileBuffer);
@@ -816,10 +815,7 @@
final boolean sync,
final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -891,10 +887,7 @@
final boolean sync,
final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -967,10 +960,7 @@
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion
callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
try
@@ -1046,10 +1036,7 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1088,6 +1075,14 @@
}
}
+ private void checkJournalIsLoaded()
+ {
+ if (state != JournalState.LOADED)
+ {
+ throw new IllegalStateException("Journal must be in state=" +
JournalState.LOADED + ", was [" + state + "]");
+ }
+ }
+
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -1101,10 +1096,7 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1150,10 +1142,7 @@
public void appendDeleteRecordTransactional(final long txID, final long id, final
EncodingSupport record) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1223,15 +1212,15 @@
}
}
- /**
- *
- * <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
+ /**
+ *
+ * <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
* back to a state it could be committed. </p>
- *
+ *
* <p> transactionData allows you to store any other supporting user-data
related to the transaction</p>
- *
+ *
* <p> This method also uses the same logic applied on {@link
JournalImpl#appendCommitRecord(long, boolean)}
- *
+ *
* @param txID
* @param transactionData extra user data for the prepare
* @throws Exception
@@ -1242,10 +1231,7 @@
final IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1310,30 +1296,27 @@
appendCommitRecord(txID, sync, callback, true);
}
-
+
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements
the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10
pendingTransactions on each file.
+ * <p>For example, a transaction was spread along 3 journal files with 10
pendingTransactions on each file.
* (What could happen if there are too many pendingTransactions, or if an user
event delayed pendingTransactions to come in time to a single file).</p>
* <p>The element-summary will then have</p>
* <p>FileID1, 10</p>
* <p>FileID2, 10</p>
* <p>FileID3, 10</p>
- *
+ *
* <br>
* <p> During the load, the transaction needs to have 30 pendingTransactions
spread across the files as originally written.</p>
* <p> If for any reason there are missing pendingTransactions, that means the
transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files
after the transaction was successfully committed.
+ * <p> We can't just use a global counter as reclaiming could delete files
after the transaction was successfully committed.
* That also means not having a whole file on journal-reload doesn't mean we
have to invalidate the transaction </p>
*
*/
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback, boolean lineUpContext) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1392,10 +1375,7 @@
public void appendRollbackRecord(final long txID, final boolean sync, final
IOCompletion callback) throws Exception
{
- if (state != JournalImpl.STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ checkJournalIsLoaded();
compactingLock.readLock().lock();
@@ -1566,16 +1546,16 @@
return info;
}
-
-
+
+
public void testCompact() throws Exception
{
final AtomicInteger errors = new AtomicInteger(0);
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
compactorRunning.set(true);
-
+
// We can't use the executor for the compacting... or we would dead lock
because of file open and creation
// operations (that will use the executor)
compactorExecutor.execute(new Runnable()
@@ -1599,14 +1579,14 @@
}
}
});
-
+
try
{
if (!latch.await(60, TimeUnit.SECONDS))
{
throw new RuntimeException("Didn't finish compact timely");
}
-
+
if (errors.get() > 0)
{
throw new RuntimeException("Error during testCompact, look at the
logs");
@@ -1619,12 +1599,12 @@
}
/**
- *
+ *
* Note: This method can't be called from the main executor, as it will invoke
other methods depending on it.
- *
+ *
* Note: only synchronized methods on journal are methods responsible for the
life-cycle such as stop, start
* records will still come as this is being executed
- *
+ *
*/
protected synchronized void compact() throws Exception
{
@@ -1657,7 +1637,7 @@
compactingLock.writeLock().lock();
try
{
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
return;
}
@@ -1837,9 +1817,9 @@
}
- /**
+ /**
* <p>Load data accordingly to the record layouts</p>
- *
+ *
* <p>Basic record layout:</p>
* <table border=1>
* <tr><td><b>Field
Name</b></td><td><b>Size</b></td></tr>
@@ -1853,9 +1833,9 @@
* <tr><td>RecordBody</td><td>Byte Array
(size=BodySize)</td></tr>
* <tr><td>Check Size</td><td>Integer (4
bytes)</td></tr>
* </table>
- *
+ *
* <p> The check-size is used to validate if the record is valid and complete
</p>
- *
+ *
* <p>Commit/Prepare record layout:</p>
* <table border=1>
* <tr><td><b>Field
Name</b></td><td><b>Size</b></td></tr>
@@ -1870,18 +1850,18 @@
* <tr><td>* NumberOfElements(n)</td><td>Integer (4
bytes)</td></tr>
* <tr><td>CheckSize</td><td>Integer (4
bytes)</td</tr>
* </table>
- *
- * <p> * FileID and NumberOfElements are the transaction summary, and they will
be repeated (N)umberOfFiles times </p>
- *
+ *
+ * <p> * FileID and NumberOfElements are the transaction summary, and they will
be repeated (N)umberOfFiles times </p>
+ *
* */
public JournalLoadInformation load(final LoaderCallback loadManager) throws Exception
{
return load(loadManager, true);
}
-
+
public synchronized JournalLoadInformation load(final LoaderCallback loadManager,
boolean fixFailingTransactions) throws Exception
{
- if (state != JournalImpl.STATE_STARTED)
+ if (state != JournalState.STARTED)
{
throw new IllegalStateException("Journal must be in started state");
}
@@ -2197,7 +2177,7 @@
filesRepository.pushOpenedFile();
- state = JournalImpl.STATE_LOADED;
+ state = JournalState.LOADED;
for (TransactionHolder transaction : loadTransactions.values())
{
@@ -2241,7 +2221,7 @@
return new JournalLoadInformation(records.size(), maxID.longValue());
}
- /**
+ /**
* @return true if cleanup was called
*/
public boolean checkReclaimStatus() throws Exception
@@ -2311,7 +2291,7 @@
return;
}
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
return;
}
@@ -2528,12 +2508,12 @@
public synchronized boolean isStarted()
{
- return state != JournalImpl.STATE_STOPPED;
+ return state != JournalState.STOPPED;
}
public synchronized void start()
{
- if (state != JournalImpl.STATE_STOPPED)
+ if (state != JournalState.STOPPED)
{
throw new IllegalStateException("Journal is not stopped");
}
@@ -2560,14 +2540,14 @@
fileFactory.start();
- state = JournalImpl.STATE_STARTED;
+ state = JournalState.STARTED;
}
public synchronized void stop() throws Exception
{
JournalImpl.trace("Stopping the journal");
- if (state == JournalImpl.STATE_STOPPED)
+ if (state == JournalState.STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
}
@@ -2577,7 +2557,7 @@
try
{
- state = JournalImpl.STATE_STOPPED;
+ state = JournalState.STOPPED;
compactorExecutor.shutdown();
@@ -2722,9 +2702,9 @@
/**
* <p> Check for holes on the transaction (a commit written but with an
incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete
as stated on the COMMIT-RECORD.</p>
- *
- * <p>Look at the javadoc on {@link JournalImpl#appendCommitRecord(long)} about
how the transaction-summary is recorded</p>
- *
+ *
+ * <p>Look at the javadoc on {@link JournalImpl#appendCommitRecord(long)} about
how the transaction-summary is recorded</p>
+ *
* @param journalTransaction
* @param orderedFiles
* @param recordedSummary
@@ -2895,8 +2875,8 @@
buffer.writeLong(fileID);
}
- /**
- *
+ /**
+ *
* @param completeTransaction If the appendRecord is for a prepare or commit, where we
should update the number of pendingTransactions on the current file
* */
private JournalFile appendRecord(final JournalInternalRecord encoder,
@@ -2905,7 +2885,7 @@
final JournalTransaction tx,
final IOAsyncTask parameterCallback) throws
Exception
{
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
throw new IllegalStateException("The journal is not loaded " +
state);
}
@@ -3006,7 +2986,7 @@
private void scheduleReclaim()
{
- if (state != JournalImpl.STATE_LOADED)
+ if (state != JournalState.LOADED)
{
return;
}
@@ -3239,6 +3219,7 @@
this.pages = pages;
}
+ @Override
public void run()
{
try