Author: borges
Date: 2011-08-09 06:29:32 -0400 (Tue, 09 Aug 2011)
New Revision: 11159
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 Reload after sync is done.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-08-09
10:27:51 UTC (rev 11158)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-08-09
10:29:32 UTC (rev 11159)
@@ -570,6 +570,11 @@
localJournal.lineUpContex(callback);
}
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ return localJournal.loadSyncOnly();
+ }
// Package protected ---------------------------------------------
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09
10:27:51 UTC (rev 11158)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09
10:29:32 UTC (rev 11159)
@@ -81,7 +81,7 @@
private Channel channel;
private Journal[] journals;
- private JournalLoadInformation[] journalLoadInformation;
+ private final JournalLoadInformation[] journalLoadInformation = new
JournalLoadInformation[2];
/** Files reserved in each journal for synchronization of existing data from the
'live' server. */
private final Map<JournalContent, Map<Long, JournalFile>>
filesReservedForSync =
@@ -244,13 +244,11 @@
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- System.out.println("State? " + journalsHolder.get(jc));
filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+ // We only need to load internal structures on the backup...
+ journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
}
- // We only need to load internal structures on the backup...
- journalLoadInformation = storage.loadInternalOnly();
-
pageManager = new PagingManagerImpl(new
PagingStoreFactoryNIO(config.getPagingDirectory(),
config.getJournalBufferSize_NIO(),
server.getScheduledPool(),
@@ -395,7 +393,7 @@
{
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- JournalImpl journal = (JournalImpl)journalsHolder.get(jc);
+ JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
journal.writeLock();
try
{
@@ -404,10 +402,12 @@
throw new IllegalStateException("Journal should not have any data
files at this point");
}
// files should be already in place.
- filesReservedForSync.remove(msg.getJournalContent());
- registerJournal(jc.typeByte, journalsHolder.get(jc));
+ filesReservedForSync.remove(jc);
+ registerJournal(jc.typeByte, journal);
+ journal.loadInternalOnly();
// XXX HORNETQ-720 must reload journals
// XXX HORNETQ-720 must start using real journals
+
}
finally
{
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-08-09
10:27:51 UTC (rev 11158)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-08-09
10:29:32 UTC (rev 11159)
@@ -111,11 +111,18 @@
JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
- /** Load internal data structures and not expose any data.
- * This is only useful if you're using the journal but not interested on the
current data.
- * Useful in situations where the journal is being replicated, copied... etc. */
+ /**
+ * Load internal data structures and not expose any data. This is only useful if
you're using the
+ * journal but not interested on the current data. Useful in situations where the
journal is
+ * being replicated, copied... etc.
+ */
JournalLoadInformation loadInternalOnly() throws Exception;
+ /**
+ * Load internal data structures, and remain waiting for synchronization to complete.
+ */
+ JournalLoadInformation loadSyncOnly() throws Exception;
+
void lineUpContex(IOCompletion callback);
JournalLoadInformation load(List<RecordInfo> committedRecords,
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09
10:27:51 UTC (rev 11158)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09
10:29:32 UTC (rev 11159)
@@ -1298,35 +1298,39 @@
return fileFactory.getAlignment();
}
- public synchronized JournalLoadInformation loadInternalOnly() throws Exception
+ private static class DummyLoader implements LoaderCallback
{
- LoaderCallback dummyLoader = new LoaderCallback()
+ static final LoaderCallback INSTANCE = new DummyLoader();
+ public void failedTransaction(final long transactionID, final
List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete)
{
+ }
- public void failedTransaction(final long transactionID,
- final List<RecordInfo> records,
- final List<RecordInfo> recordsToDelete)
- {
- }
+ public void updateRecord(final RecordInfo info)
+ {
+ }
- public void updateRecord(final RecordInfo info)
- {
- }
+ public void deleteRecord(final long id)
+ {
+ }
- public void deleteRecord(final long id)
- {
- }
+ public void addRecord(final RecordInfo info)
+ {
+ }
- public void addRecord(final RecordInfo info)
- {
- }
+ public void addPreparedTransaction(final PreparedTransactionInfo
preparedTransaction)
+ {
+ }
+ }
- public void addPreparedTransaction(final PreparedTransactionInfo
preparedTransaction)
- {
- }
- };
+ public synchronized JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ return load(DummyLoader.INSTANCE, true, false);
+ }
- return this.load(dummyLoader, true, true);
+ public synchronized JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ return load(DummyLoader.INSTANCE, true, true);
}
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
@@ -1746,12 +1750,16 @@
private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
boolean fixFailingTransactions,
final boolean replicationSync) throws Exception
{
-
- if (state != JournalState.STARTED)
+ System.out.println("LOAD! " + state + " " + replicationSync);
+ if (state == JournalState.STOPPED || state == JournalState.LOADED)
{
throw new IllegalStateException("Journal " + this + " must be in
" + JournalState.STARTED + " state, was " +
state);
}
+ if (state == JournalState.SYNCING && replicationSync)
+ {
+ throw new IllegalStateException("Journal must be state " +
JournalState.STARTED);
+ }
checkControlFile();
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09
10:27:51 UTC (rev 11158)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09
10:29:32 UTC (rev 11159)
@@ -223,4 +223,10 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-09
10:27:51 UTC (rev 11158)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-09
10:29:32 UTC (rev 11159)
@@ -161,10 +161,6 @@
@Override
protected void tearDown() throws Exception
{
- if (handler != null)
- {
- handler.notifyAll();
- }
if (sessionFactory != null)
sessionFactory.close();
if (session != null)
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-08-09
10:27:51 UTC (rev 11158)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-08-09
10:29:32 UTC (rev 11159)
@@ -846,5 +846,16 @@
}
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadSyncOnly()
+ */
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}