Author: clebert.suconic(a)jboss.com
Date: 2009-10-02 17:07:23 -0400 (Fri, 02 Oct 2009)
New Revision: 8039
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java
branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-02
17:15:12 UTC (rev 8038)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-02
21:07:23 UTC (rev 8039)
@@ -86,22 +86,5 @@
void perfBlast(int pages) throws Exception;
- /** This method is called automatically when a new file is opened.
- * @return true if it needs to re-check due to cleanup or other factors */
- boolean checkReclaimStatus() throws Exception;
- /** This method check for the need of compacting based on the minCompactPercentage
- * This method is usually called automatically when new files are opened
- */
- void checkCompact() throws Exception;
-
- /**
- * Eliminate deleted records of the journal.
- * @throws Exception
- */
- void compact() throws Exception;
-
-
- JournalFile[] getDataFiles();
-
}
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java 2009-10-02
17:15:12 UTC (rev 8038)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java 2009-10-02
21:07:23 UTC (rev 8039)
@@ -13,6 +13,7 @@
package org.hornetq.core.journal;
+import org.hornetq.core.journal.impl.JournalFile;
/**
*
@@ -52,6 +53,14 @@
boolean isAutoReclaim();
+ void compact() throws Exception;
+ /** This method is called automatically when a new file is opened.
+ * @return true if it needs to re-check due to cleanup or other factors */
+ boolean checkReclaimStatus() throws Exception;
+
+ JournalFile[] getDataFiles();
+
+
}
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-02
17:15:12 UTC (rev 8038)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-02
21:07:23 UTC (rev 8039)
@@ -2159,7 +2159,7 @@
return (compactMinFiles * compactPercentage);
}
- public synchronized void cleanUp(final JournalFile file) throws Exception
+ private synchronized void cleanUp(final JournalFile file) throws Exception
{
if (state != STATE_LOADED)
{
@@ -2236,7 +2236,7 @@
}
- public void checkCompact() throws Exception
+ private void checkCompact() throws Exception
{
if (compactMinFiles == 0)
{
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
(rev 0)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java 2009-10-02
21:07:23 UTC (rev 8039)
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import java.util.List;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
+import org.hornetq.core.replication.ReplicationManager;
+
+/**
+ * A ReplicatedJournalImpl
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicatedJournalImpl implements Journal
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final ReplicationManager replicationManager;
+
+ private final Journal replicatedJournal;
+
+ private final byte journalID;
+
+ /**
+ * @param journaID
+ * @param replicatedJournal
+ * @param replicationManager
+ */
+ public ReplicatedJournalImpl(byte journaID, Journal replicatedJournal,
ReplicationManager replicationManager)
+ {
+ super();
+ this.journalID = journaID;
+ this.replicatedJournal = replicatedJournal;
+ this.replicationManager = replicationManager;
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
+ */
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception
+ {
+ replicationManager.appendAddRecord(journalID, id, recordType, new
ByteArrayEncoding(record));
+ replicatedJournal.appendAddRecord(id, recordType, record, sync);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception
+ {
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ replicatedJournal.appendAddRecord(id, recordType, record, sync);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long,
byte, byte[])
+ */
+ public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[]
record) throws Exception
+ {
+ replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType,
new ByteArrayEncoding(record));
+ replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long,
byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendAddRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record) throws Exception
+ {
+ replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType,
record);
+ replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ }
+
+ /**
+ * @param txID
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
+ */
+ public void appendCommitRecord(long txID, boolean sync) throws Exception
+ {
+ replicationManager.appendCommitRecord(journalID, txID);
+ replicatedJournal.appendCommitRecord(txID, sync);
+ }
+
+ /**
+ * @param id
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
+ */
+ public void appendDeleteRecord(long id, boolean sync) throws Exception
+ {
+ replicationManager.appendDeleteRecord(journalID, id);
+ replicatedJournal.appendDeleteRecord(id, sync);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long,
byte[])
+ */
+ public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws
Exception
+ {
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id, new
ByteArrayEncoding(record));
+ replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long,
org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
+ {
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
+ replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
+ */
+ public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+ {
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+ replicatedJournal.appendDeleteRecordTransactional(txID, id);
+ }
+
+ /**
+ * @param txID
+ * @param transactionData
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync)
throws Exception
+ {
+ replicationManager.appendPrepareRecord(journalID, txID, new
ByteArrayEncoding(transactionData));
+ replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+ }
+
+ /**
+ * @param txID
+ * @param transactionData
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long,
org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync) throws Exception
+ {
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+ }
+
+ /**
+ * @param txID
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
+ */
+ public void appendRollbackRecord(long txID, boolean sync) throws Exception
+ {
+ replicationManager.appendRollbackRecord(journalID, txID);
+ replicatedJournal.appendRollbackRecord(txID, sync);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[],
boolean)
+ */
+ public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception
+ {
+ replicationManager.appendUpdateRecord(journalID, id, recordType, new
ByteArrayEncoding(record));
+ replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendUpdateRecord(long id, byte recordType, EncodingSupport record,
boolean sync) throws Exception
+ {
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long,
byte, byte[])
+ */
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
byte[] record) throws Exception
+ {
+ replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType,
new ByteArrayEncoding(record));
+ replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long,
byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record) throws Exception
+ {
+ replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType,
record);
+ replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ }
+
+ /**
+ * @param committedRecords
+ * @param preparedTransactions
+ * @param transactionFailure
+ * @return
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List,
org.hornetq.core.journal.TransactionFailureCallback)
+ */
+ public long load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions,
+ TransactionFailureCallback transactionFailure) throws Exception
+ {
+ return replicatedJournal.load(committedRecords, preparedTransactions,
transactionFailure);
+ }
+
+ /**
+ * @param reloadManager
+ * @return
+ * @throws Exception
+ * @see
org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
+ */
+ public long load(LoaderCallback reloadManager) throws Exception
+ {
+ return replicatedJournal.load(reloadManager);
+ }
+
+ /**
+ * @param pages
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#perfBlast(int)
+ */
+ public void perfBlast(int pages) throws Exception
+ {
+ replicatedJournal.perfBlast(pages);
+ }
+
+ /**
+ * @throws Exception
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+ replicatedJournal.start();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ replicatedJournal.stop();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getAlignment()
+ */
+ public int getAlignment() throws Exception
+ {
+ return replicatedJournal.getAlignment();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+ return replicatedJournal.isStarted();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-02
17:15:12 UTC (rev 8038)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-02
21:07:23 UTC (rev 8039)
@@ -66,7 +66,7 @@
private boolean started;
- private boolean playedResponsesOnFailure;
+ private volatile boolean enabled;
private final Object replicationLock = new Object();
@@ -100,80 +100,120 @@
public void appendAddRecord(final byte journalID, final long id, final byte
recordType, final EncodingSupport record)
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType,
record));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType,
record));
+ }
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long,
byte, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendUpdateRecord(byte journalID, long id, byte recordType,
EncodingSupport record) throws Exception
+ public void appendUpdateRecord(final byte journalID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record) throws Exception
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType,
record));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType,
record));
+ }
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long,
boolean)
*/
- public void appendDeleteRecord(byte journalID, long id) throws Exception
+ public void appendDeleteRecord(final byte journalID, final long id) throws Exception
{
- sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
+ }
}
- public void appendAddRecordTransactional(byte journalID, long txID, long id, byte
recordType, EncodingSupport record) throws Exception
+ public void appendAddRecordTransactional(final byte journalID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record) throws
Exception
{
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id,
recordType, record));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id,
recordType, record));
+ }
}
/* (non-Javadoc)
* @see
org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte,
long, long, byte, org.hornetq.core.journal.EncodingSupport)
*/
- public void appendUpdateRecordTransactional(byte journalID,
- long txID,
- long id,
- byte recordType,
- EncodingSupport record) throws Exception
+ public void appendUpdateRecordTransactional(final byte journalID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record) throws
Exception
{
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id,
recordType, record));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id,
recordType, record));
+ }
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long,
boolean)
*/
- public void appendCommitRecord(byte journalID, long txID) throws Exception
+ public void appendCommitRecord(final byte journalID, final long txID) throws
Exception
{
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+ }
}
/* (non-Javadoc)
* @see
org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte,
long, long, org.hornetq.core.journal.EncodingSupport)
*/
- public void appendDeleteRecordTransactional(byte journalID, long txID, long id,
EncodingSupport record) throws Exception
+ public void appendDeleteRecordTransactional(final byte journalID,
+ final long txID,
+ final long id,
+ final EncodingSupport record) throws
Exception
{
- sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id,
record));
+ }
}
/* (non-Javadoc)
* @see
org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte,
long, long)
*/
- public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws
Exception
+ public void appendDeleteRecordTransactional(final byte journalID, final long txID,
final long id) throws Exception
{
- sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id,
NullEncoding.instance));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id,
NullEncoding.instance));
+ }
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte,
long, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendPrepareRecord(byte journalID, long txID, EncodingSupport
transactionData) throws Exception
+ public void appendPrepareRecord(final byte journalID, final long txID, final
EncodingSupport transactionData) throws Exception
{
- sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID,
transactionData));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID,
transactionData));
+ }
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte,
long, boolean)
*/
- public void appendRollbackRecord(byte journalID, long txID) throws Exception
+ public void appendRollbackRecord(final byte journalID, final long txID) throws
Exception
{
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+ }
}
/* (non-Javadoc)
@@ -181,7 +221,7 @@
*/
public synchronized boolean isStarted()
{
- return this.started;
+ return started;
}
/* (non-Javadoc)
@@ -195,16 +235,18 @@
Channel mainChannel = connection.getChannel(1, -1, false);
- this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+ replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
- this.replicatingChannel.setHandler(this.responseHandler);
+ replicatingChannel.setHandler(responseHandler);
CreateReplicationSessionMessage replicationStartPackage = new
CreateReplicationSessionMessage(channelID,
WINDOW_SIZE);
mainChannel.sendBlocking(replicationStartPackage);
- this.started = true;
+ started = true;
+
+ enabled = true;
}
/* (non-Javadoc)
@@ -217,7 +259,7 @@
replicatingChannel.close();
}
- this.started = false;
+ started = false;
if (connection != null)
{
@@ -242,7 +284,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
*/
- public void addReplicationAction(Runnable runnable)
+ public void addReplicationAction(final Runnable runnable)
{
getReplicationToken().addReplicationAction(runnable);
}
@@ -267,7 +309,7 @@
});
}
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
@@ -276,7 +318,6 @@
return activeTokens;
}
-
private void sendReplicatePacket(final Packet packet)
{
boolean runItNow = false;
@@ -286,7 +327,7 @@
synchronized (replicationLock)
{
- if (playedResponsesOnFailure)
+ if (!enabled)
{
// Already replicating channel failed, so just play the action now
@@ -336,7 +377,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
- public void handlePacket(Packet packet)
+ public void handlePacket(final Packet packet)
{
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
{
@@ -366,5 +407,4 @@
}
-
}
Modified:
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-02
17:15:12 UTC (rev 8038)
+++
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-02
21:07:23 UTC (rev 8039)
@@ -34,6 +34,11 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
@@ -42,6 +47,7 @@
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.impl.ReplicatedJournalImpl;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQServerImpl;
@@ -147,21 +153,23 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager,
executor);
manager.start();
- manager.appendPrepareRecord((byte)0, 100, new FakeData());
+ Journal replicatedJournal = new ReplicatedJournalImpl((byte)1, new
FakeJournal(), manager);
- manager.appendAddRecord((byte)0, 1, (byte)1, new FakeData());
- manager.appendUpdateRecord((byte)0, 1, (byte)2, new FakeData());
- manager.appendDeleteRecord((byte)0, 1);
- manager.appendAddRecordTransactional((byte)0, 2, 2, (byte)1, new FakeData());
- manager.appendUpdateRecordTransactional((byte)0, 2, 2, (byte)2, new
FakeData());
- manager.appendCommitRecord((byte)0, 2);
+ replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
- manager.appendDeleteRecordTransactional((byte)0, 3, 4, new FakeData());
- manager.appendPrepareRecord((byte)0, 3, new FakeData());
- manager.appendRollbackRecord((byte)0, 3);
+ replicatedJournal.appendAddRecord(1, (byte)1, new FakeData(), false);
+ replicatedJournal.appendUpdateRecord(1, (byte)2, new FakeData(), false);
+ replicatedJournal.appendDeleteRecord(1, false);
+ replicatedJournal.appendAddRecordTransactional(2, 2, (byte)1, new FakeData());
+ replicatedJournal.appendUpdateRecordTransactional(2, 2, (byte)2, new
FakeData());
+ replicatedJournal.appendCommitRecord(2, false);
+ replicatedJournal.appendDeleteRecordTransactional(3, 4, new FakeData());
+ replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
+ replicatedJournal.appendRollbackRecord(3, false);
+
final CountDownLatch latch = new CountDownLatch(1);
- manager.getReplicationToken().addReplicationAction(new Runnable()
+ manager.addReplicationAction(new Runnable()
{
public void run()
@@ -172,10 +180,10 @@
});
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertEquals(1, manager.getActiveTokens().size());
-
+
manager.completeToken();
-
- for (int i = 0 ; i < 100; i++)
+
+ for (int i = 0; i < 100; i++)
{
// This is asynchronous. Have to wait completion
if (manager.getActiveTokens().size() == 0)
@@ -292,7 +300,7 @@
connectionManager = null;
scheduledExecutor = null;
-
+
super.tearDown();
}
@@ -300,4 +308,199 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+ static class FakeJournal implements Journal
+ {
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[],
boolean)
+ */
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record,
boolean sync) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long,
byte, byte[])
+ */
+ public void appendAddRecordTransactional(long txID, long id, byte recordType,
byte[] record) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long,
byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendAddRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
+ */
+ public void appendCommitRecord(long txID, boolean sync) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
+ */
+ public void appendDeleteRecord(long id, boolean sync) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long,
long, byte[])
+ */
+ public void appendDeleteRecordTransactional(long txID, long id, byte[] record)
throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long,
long, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long,
long)
+ */
+ public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long,
org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[],
boolean)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync)
throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
+ */
+ public void appendRollbackRecord(long txID, boolean sync) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[],
boolean)
+ */
+ public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean
sync) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendUpdateRecord(long id, byte recordType, EncodingSupport record,
boolean sync) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long,
long, byte, byte[])
+ */
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
byte[] record) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long,
long, byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getAlignment()
+ */
+ public int getAlignment() throws Exception
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
+ */
+ public long load(LoaderCallback reloadManager) throws Exception
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List,
org.hornetq.core.journal.TransactionFailureCallback)
+ */
+ public long load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions,
+ TransactionFailureCallback transactionFailure) throws Exception
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#perfBlast(int)
+ */
+ public void perfBlast(int pages) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+
+ }
+
+ }
}