Author: borges
Date: 2011-08-10 06:28:45 -0400 (Wed, 10 Aug 2011)
New Revision: 11176
Added:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
Removed:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Rename ReplicatingJournal (as there was a ReplicatEDJournal already)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-10
01:53:32 UTC (rev 11175)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-10
10:28:45 UTC (rev 11176)
@@ -26,9 +26,9 @@
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.impl.FileWrapperJournal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.ReplicatingJournal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
@@ -441,7 +441,7 @@
/**
* Reserves files (with the given fileID) in the specified journal, and places a
- * {@link ReplicatingJournal} in place to store messages while synchronization is
going on.
+ * {@link FileWrapperJournal} in place to store messages while synchronization is
going on.
* @param packet
* @throws Exception
*/
@@ -458,8 +458,8 @@
JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(),
mapToFill);
current.getFile().open(1, false);
registerJournal(packet.getJournalContentType().typeByte,
- new ReplicatingJournal(current, storage.hasCallbackSupport()));
- }
+ new FileWrapperJournal(current, storage.hasCallbackSupport()));
+ }
// XXX HORNETQ-720 really need to do away with this once the method calls get stable.
private static JournalImpl assertJournalImpl(final Journal journalIf) throws
HornetQException
Copied:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
(from rev 11169,
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java)
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10
10:28:45 UTC (rev 11176)
@@ -0,0 +1,229 @@
+package org.hornetq.core.journal.impl;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
+
+/**
+ * Journal used at a replicating backup server during the synchronization of data with
the 'live'
+ * server. It just wraps a single {@link JournalFile}.
+ * <p>
+ * Its main purpose is to store the data as a Journal would, but without verifying
records.
+ */
+public class FileWrapperJournal extends JournalBase implements Journal
+{
+ private final ReentrantLock lockAppend = new ReentrantLock();
+ // private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+
+ private final JournalFile currentFile;
+
+ /**
+ * @param file
+ */
+ public FileWrapperJournal(JournalFile file, boolean hasCallbackSupport)
+ {
+ super(hasCallbackSupport);
+ currentFile = file;
+ }
+
+ @Override
+ public void start() throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ currentFile.getFile().close();
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------------------------
+
+ // ------------------------
+
+// private void readLockJournal()
+// {
+// journalLock.readLock().lock();
+// }
+//
+// private void readUnlockJournal()
+// {
+// journalLock.readLock().unlock();
+// }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion callback)
+ throws Exception
+ {
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType,
record);
+
+ writeRecord(addRecord, sync, callback);
+ }
+
+ /**
+ * Write the record to the current file.
+ */
+ private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion
callback) throws Exception
+ {
+
+
+ lockAppend.lock();
+ try
+ {
+ if (callback != null)
+ {
+ callback.storeLineUp();
+ }
+
+ encoder.setFileID(currentFile.getRecordID());
+
+ if (callback != null)
+ {
+ currentFile.getFile().write(encoder, sync, callback);
+ }
+ else
+ {
+ currentFile.getFile().write(encoder, sync);
+ }
+ }
+ finally
+ {
+ lockAppend.unlock();
+ }
+ }
+
+ @Override
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendAddRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void
+ appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion callback)
+ throws Exception
+ {
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType,
record);
+ writeRecord(updateRecord, sync, callback);
+ }
+
+ @Override
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public void lineUpContex(IOCompletion callback)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure)
+ throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public int getAlignment() throws Exception
+ {
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ }
+
+ @Override
+ public int getNumberOfRecords()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getUserVersion()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void perfBlast(int pages)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void runDirectJournalBlast() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Deleted:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-10
01:53:32 UTC (rev 11175)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-10
10:28:45 UTC (rev 11176)
@@ -1,232 +0,0 @@
-package org.hornetq.core.journal.impl;
-
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.IOCompletion;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.LoaderCallback;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.TransactionFailureCallback;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
-
-/**
- * Journal used at a replicating backup server during the synchronization of data with
the 'live'
- * server.
- * <p>
- * Its main purpose is to store the data as a Journal would, but without verifying
records.
- */
-public class ReplicatingJournal extends JournalBase implements Journal
-{
- private final ReentrantLock lockAppend = new ReentrantLock();
- private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
-
- private final JournalFile currentFile;
-
- /**
- * @param file
- */
- public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
- {
- super(hasCallbackSupport);
- currentFile = file;
- }
-
- @Override
- public void start() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void stop() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public boolean isStarted()
- {
- throw new UnsupportedOperationException();
- }
-
- // ------------------------
-
- // ------------------------
-
- private void readLockJournal()
- {
- journalLock.readLock().lock();
- }
-
- private void readUnlockJournal()
- {
- journalLock.readLock().unlock();
- }
-
- @Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion callback)
- throws Exception
- {
- JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType,
record);
-
- writeRecord(addRecord, sync, callback);
-
- }
-
- /**
- * Write the record to the current file.
- */
- private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion
callback) throws Exception
- {
-
-
- lockAppend.lock();
- try
- {
- if (callback != null)
- {
- callback.storeLineUp();
- }
-
- encoder.setFileID(currentFile.getRecordID());
-
- if (callback != null)
- {
- currentFile.getFile().write(encoder, sync, callback);
- }
- else
- {
- currentFile.getFile().write(encoder, sync);
- }
- }
- finally
- {
- lockAppend.unlock();
- }
- }
-
- @Override
- public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendAddRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void
- appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion callback)
- throws Exception
- {
- JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType,
record);
- writeRecord(updateRecord, sync, callback);
- }
-
- @Override
- public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public JournalLoadInformation loadInternalOnly() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void lineUpContex(IOCompletion callback)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JournalLoadInformation load(List<RecordInfo> committedRecords,
- List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public int getAlignment() throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public int getNumberOfRecords()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getUserVersion()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void perfBlast(int pages)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void runDirectJournalBlast() throws Exception
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JournalLoadInformation loadSyncOnly() throws Exception
- {
- throw new UnsupportedOperationException();
- }
-}