Author: clebert.suconic(a)jboss.com
Date: 2009-10-28 18:09:58 -0400 (Wed, 28 Oct 2009)
New Revision: 8159
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
some minor changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-28
18:32:19 UTC (rev 8158)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-28
22:09:58 UTC (rev 8159)
@@ -50,7 +50,7 @@
private final Journal journalTo;
/** Proxy mode means, everything will be copied over without any evaluations.
- * This is useful at the end of copying when everything needs to be copied. */
+ * This is used at the end of copying when everything needs to be copied. */
private boolean proxyMode = false;
// Static --------------------------------------------------------
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28
18:32:19 UTC (rev 8158)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28
22:09:58 UTC (rev 8159)
@@ -129,9 +129,9 @@
private final ReplicationManager replicator;
- private final Journal messageJournal;
+ private final ReplicatedJournal messageJournal;
- private final Journal bindingsJournal;
+ private final ReplicatedJournal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
@@ -193,14 +193,7 @@
"bindings",
1);
- if (replicator != null)
- {
- this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings,
localBindings, replicator);
- }
- else
- {
- this.bindingsJournal = localBindings;
- }
+ this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, localBindings,
replicator);
if (journalDir == null)
{
@@ -261,14 +254,7 @@
"hq",
config.getJournalMaxAIO());
- if (replicator != null)
- {
- this.messageJournal = new ReplicatedJournal((byte)1, localMessage, localMessage,
replicator);
- }
- else
- {
- this.messageJournal = localMessage;
- }
+ this.messageJournal = new ReplicatedJournal((byte)1, localMessage, localMessage,
replicator);
largeMessagesDirectory = config.getLargeMessagesDirectory();
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-28
18:32:19 UTC (rev 8158)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-28
22:09:58 UTC (rev 8159)
@@ -46,7 +46,7 @@
private static final boolean trace = log.isTraceEnabled();
- private final ReplicationManager replicationManager;
+ private volatile ReplicationManager replicationManager;
private final Journal localJournal;
@@ -54,6 +54,7 @@
private final byte journalID;
+ /** The server is started with the backup */
public ReplicatedJournal(final byte journaID,
final JournalLock journalLock,
final Journal localJournal,
@@ -66,6 +67,7 @@
this.journalLock = journalLock;
}
+ /** Used by the backup process, to send the backup over */
public ReplicatedJournal(final byte journaID, final ReplicationManager
replicationManager)
{
super();
@@ -115,7 +117,10 @@
preAppend();
try
{
- replicationManager.appendAddRecord(journalID, id, recordType, record);
+ if (replicationManager != null)
+ {
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ }
if (localJournal != null)
{
localJournal.appendAddRecord(id, recordType, record, sync);
@@ -160,7 +165,10 @@
preAppend();
try
{
- replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType,
record);
+ if (replicationManager != null)
+ {
+ replicationManager.appendAddRecordTransactional(journalID, txID, id,
recordType, record);
+ }
if (localJournal != null)
{
localJournal.appendAddRecordTransactional(txID, id, recordType, record);
@@ -188,7 +196,10 @@
preAppend();
try
{
- replicationManager.appendCommitRecord(journalID, txID);
+ if (replicationManager != null)
+ {
+ replicationManager.appendCommitRecord(journalID, txID);
+ }
if (localJournal != null)
{
localJournal.appendCommitRecord(txID, sync);
@@ -216,7 +227,10 @@
preAppend();
try
{
- replicationManager.appendDeleteRecord(journalID, id);
+ if (replicationManager != null)
+ {
+ replicationManager.appendDeleteRecord(journalID, id);
+ }
if (localJournal != null)
{
localJournal.appendDeleteRecord(id, sync);
@@ -257,7 +271,10 @@
preAppend();
try
{
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id,
record);
+ if (replicationManager != null)
+ {
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id,
record);
+ }
if (localJournal != null)
{
localJournal.appendDeleteRecordTransactional(txID, id, record);
@@ -285,7 +302,10 @@
preAppend();
try
{
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+ if (replicationManager != null)
+ {
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+ }
if (localJournal != null)
{
localJournal.appendDeleteRecordTransactional(txID, id);
@@ -326,7 +346,10 @@
preAppend();
try
{
- replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ if (replicationManager != null)
+ {
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ }
if (localJournal != null)
{
localJournal.appendPrepareRecord(txID, transactionData, sync);
@@ -354,7 +377,10 @@
preAppend();
try
{
- replicationManager.appendRollbackRecord(journalID, txID);
+ if (replicationManager != null)
+ {
+ replicationManager.appendRollbackRecord(journalID, txID);
+ }
if (localJournal != null)
{
localJournal.appendRollbackRecord(txID, sync);
@@ -397,7 +423,10 @@
preAppend();
try
{
- replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ if (replicationManager != null)
+ {
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ }
if (localJournal != null)
{
localJournal.appendUpdateRecord(id, recordType, record, sync);
@@ -446,7 +475,10 @@
preAppend();
try
{
- replicationManager.appendUpdateRecordTransactional(journalID, txID, id,
recordType, record);
+ if (replicationManager != null)
+ {
+ replicationManager.appendUpdateRecordTransactional(journalID, txID, id,
recordType, record);
+ }
if (localJournal != null)
{
localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
@@ -591,6 +623,10 @@
*/
public void loadInternalOnly() throws Exception
{
+ if (localJournal != null)
+ {
+ localJournal.loadInternalOnly();
+ }
}
// Package protected ---------------------------------------------
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-28
18:32:19 UTC (rev 8158)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-28
22:09:58 UTC (rev 8159)
@@ -317,6 +317,13 @@
{
connection = failoverManager.getConnection();
+ if (connection == null)
+ {
+ log.warn("Backup server MUST be started before live server. Initialisation
will not proceed.");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Backup server MUST be started before live
server. Initialisation will not proceed.");
+ }
+
long channelID = connection.generateChannelID();
Channel mainChannel = connection.getChannel(1, -1, false);
@@ -357,15 +364,15 @@
public void stop() throws Exception
{
enabled = false;
-
+
for (ReplicationContext ctx : activeContexts)
{
ctx.complete();
ctx.flush();
}
-
+
activeContexts.clear();
-
+
if (replicatingChannel != null)
{
replicatingChannel.close();
Modified:
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-28
18:32:19 UTC (rev 8158)
+++
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-28
22:09:58 UTC (rev 8159)
@@ -291,7 +291,6 @@
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
new FakeJournal(), manager);
- Thread.sleep(100);
TestInterceptor.value.set(false);
for (int i = 0; i < 500; i++)
@@ -320,24 +319,20 @@
}
}
- /**
- * @param manager
- * @return
- */
- private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+ public void testNoServer() throws Exception
{
- final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ FailoverManager failoverManager = createFailoverManager();
+
+ try
{
-
- public void run()
- {
- latch.countDown();
- }
-
- });
-
- assertTrue(latch.await(30, TimeUnit.SECONDS));
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
+ manager.start();
+ fail("Exception expected");
+ }
+ catch (HornetQException expected)
+ {
+ assertEquals(HornetQException.ILLEGAL_STATE , expected.getCode());
+ }
}
public void testNoActions() throws Exception
@@ -461,6 +456,26 @@
interceptors);
}
+ /**
+ * @param manager
+ * @return
+ */
+ private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ manager.afterReplicated(new Runnable()
+ {
+
+ public void run()
+ {
+ latch.countDown();
+ }
+
+ });
+
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
+
protected void tearDown() throws Exception
{