[hornetq-commits] JBoss hornetq SVN: r8060 - in branches/Replication_Clebert: src/main/org/hornetq/core/replication/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 6 22:30:02 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-06 22:30:02 -0400 (Tue, 06 Oct 2009)
New Revision: 8060
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
Log:
Fixes on replicating ID generation
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java 2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java 2009-10-07 02:30:02 UTC (rev 8060)
@@ -38,8 +38,6 @@
private static final Logger log = Logger.getLogger(BatchingIDGenerator.class);
- public static final byte ID_COUNTER_RECORD = 24;
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -127,7 +125,7 @@
{
try
{
- journalStorage.appendAddRecord(journalID, ID_COUNTER_RECORD, new IDCounterEncoding(id), true);
+ journalStorage.appendAddRecord(journalID, JournalStorageManager.ID_COUNTER_RECORD, new IDCounterEncoding(id), true);
}
catch (Exception e)
{
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-07 02:30:02 UTC (rev 8060)
@@ -94,6 +94,8 @@
public static final byte PERSISTENT_ID_RECORD = 23;
+ public static final byte ID_COUNTER_RECORD = 24;
+
// type + expiration + timestamp + priority
public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
@@ -235,9 +237,16 @@
{
throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
}
+
+ if (config.isBackup())
+ {
+ this.idGenerator = null;
+ }
+ else
+ {
+ this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
+ }
- this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
-
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
config.getJournalCompactMinFiles(),
@@ -1087,7 +1096,7 @@
persistentID = encoding.uuid;
}
- else if (rec == BatchingIDGenerator.ID_COUNTER_RECORD)
+ else if (rec == ID_COUNTER_RECORD)
{
idGenerator.loadState(record.id, buffer);
}
@@ -1131,7 +1140,10 @@
}
// Must call close to make sure last id is persisted
- idGenerator.close();
+ if (idGenerator != null)
+ {
+ idGenerator.close();
+ }
bindingsJournal.stop();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java 2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java 2009-10-07 02:30:02 UTC (rev 8060)
@@ -40,16 +40,20 @@
// Attributes ----------------------------------------------------
+ private static final boolean trace = false;
+
private final ReplicationManager replicationManager;
private final Journal replicatedJournal;
private final byte journalID;
- public ReplicatedJournalImpl(byte journaID, Journal replicatedJournal, ReplicationManager replicationManager)
+ public ReplicatedJournalImpl(final byte journaID,
+ final Journal replicatedJournal,
+ final ReplicationManager replicationManager)
{
super();
- this.journalID = journaID;
+ journalID = journaID;
this.replicatedJournal = replicatedJournal;
this.replicationManager = replicationManager;
}
@@ -67,7 +71,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
*/
- public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
{
this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@@ -80,9 +84,12 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+ public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
- System.out.println("Append record id = " + id + " recordType = " + recordType);
+ if (trace)
+ {
+ System.out.println("Append record id = " + id + " recordType = " + recordType);
+ }
replicationManager.appendAddRecord(journalID, id, recordType, record);
replicatedJournal.appendAddRecord(id, recordType, record, sync);
}
@@ -95,7 +102,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[])
*/
- public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+ public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
{
this.appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
@@ -108,9 +115,15 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
*/
- public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ public void appendAddRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record) throws Exception
{
- System.out.println("Append record TXid = " + id + " recordType = " + recordType);
+ if (trace)
+ {
+ System.out.println("Append record TXid = " + id + " recordType = " + recordType);
+ }
replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
}
@@ -121,9 +134,12 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
*/
- public void appendCommitRecord(long txID, boolean sync) throws Exception
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
- System.out.println("AppendCommit " + txID);
+ if (trace)
+ {
+ System.out.println("AppendCommit " + txID);
+ }
replicationManager.appendCommitRecord(journalID, txID);
replicatedJournal.appendCommitRecord(txID, sync);
}
@@ -134,12 +150,15 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
*/
- public void appendDeleteRecord(long id, boolean sync) throws Exception
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
{
- System.out.println("AppendDelete " + id);
+ if (trace)
+ {
+ System.out.println("AppendDelete " + id);
+ }
replicationManager.appendDeleteRecord(journalID, id);
replicatedJournal.appendDeleteRecord(id, sync);
- }
+ }
/**
* @param txID
@@ -148,7 +167,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[])
*/
- public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
{
this.appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
}
@@ -160,9 +179,12 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.hornetq.core.journal.EncodingSupport)
*/
- public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
{
- System.out.println("AppendDelete txID=" + txID + " id=" + id);
+ if (trace)
+ {
+ System.out.println("AppendDelete txID=" + txID + " id=" + id);
+ }
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
}
@@ -173,9 +195,12 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
*/
- public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
{
- System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
+ if (trace)
+ {
+ System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
+ }
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
replicatedJournal.appendDeleteRecordTransactional(txID, id);
}
@@ -187,7 +212,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
*/
- public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+ public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
{
this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
@@ -199,9 +224,12 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
{
- System.out.println("AppendPrepare txID=" + txID);
+ if (trace)
+ {
+ System.out.println("AppendPrepare txID=" + txID);
+ }
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
}
@@ -212,9 +240,12 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
*/
- public void appendRollbackRecord(long txID, boolean sync) throws Exception
+ public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
{
- System.out.println("AppendRollback " + txID);
+ if (trace)
+ {
+ System.out.println("AppendRollback " + txID);
+ }
replicationManager.appendRollbackRecord(journalID, txID);
replicatedJournal.appendRollbackRecord(txID, sync);
}
@@ -227,7 +258,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean)
*/
- public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
{
this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@@ -240,9 +271,12 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+ public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
- System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
+ if (trace)
+ {
+ System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
+ }
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
}
@@ -255,7 +289,10 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[])
*/
- public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+ public void appendUpdateRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final byte[] record) throws Exception
{
this.appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
@@ -268,9 +305,15 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
*/
- public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ public void appendUpdateRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record) throws Exception
{
- System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
+ if (trace)
+ {
+ System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
+ }
replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
}
@@ -283,9 +326,9 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
*/
- public long load(List<RecordInfo> committedRecords,
- List<PreparedTransactionInfo> preparedTransactions,
- TransactionFailureCallback transactionFailure) throws Exception
+ public long load(final List<RecordInfo> committedRecords,
+ final List<PreparedTransactionInfo> preparedTransactions,
+ final TransactionFailureCallback transactionFailure) throws Exception
{
return replicatedJournal.load(committedRecords, preparedTransactions, transactionFailure);
}
@@ -296,7 +339,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
*/
- public long load(LoaderCallback reloadManager) throws Exception
+ public long load(final LoaderCallback reloadManager) throws Exception
{
return replicatedJournal.load(reloadManager);
}
@@ -306,7 +349,7 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#perfBlast(int)
*/
- public void perfBlast(int pages) throws Exception
+ public void perfBlast(final int pages) throws Exception
{
replicatedJournal.perfBlast(pages);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-07 02:30:02 UTC (rev 8060)
@@ -31,7 +31,6 @@
import org.hornetq.core.server.HornetQServer;
/**
- * A ReplicationPacketHandler
*
* @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*
@@ -46,6 +45,8 @@
// Attributes ----------------------------------------------------
+ private static final boolean trace = false;
+
private final HornetQServer server;
private Channel channel;
@@ -55,13 +56,9 @@
private Journal messagingJournal;
private JournalStorageManager storage;
-
- private volatile boolean started;
- // Static --------------------------------------------------------
-
// Constructors --------------------------------------------------
- public ReplicationEndpointImpl(HornetQServer server)
+ public ReplicationEndpointImpl(final HornetQServer server)
{
this.server = server;
}
@@ -71,7 +68,7 @@
* (non-Javadoc)
* @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
- public void handlePacket(Packet packet)
+ public void handlePacket(final Packet packet)
{
try
{
@@ -127,13 +124,11 @@
storage = new JournalStorageManager(config, null);
storage.start();
- this.bindingsJournal = storage.getBindingsJournal();
- this.messagingJournal = storage.getMessageJournal();
+ bindingsJournal = storage.getBindingsJournal();
+ messagingJournal = storage.getMessageJournal();
// We only need to load internal structures on the backup...
storage.loadInternalOnly();
-
- started = true;
}
/* (non-Javadoc)
@@ -141,7 +136,6 @@
*/
public void stop() throws Exception
{
- started = false;
channel.close();
storage.stop();
}
@@ -157,7 +151,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
*/
- public void setChannel(Channel channel)
+ public void setChannel(final Channel channel)
{
this.channel = channel;
}
@@ -171,13 +165,12 @@
/**
* @param packet
*/
- private void handleCommitRollback(Packet packet) throws Exception
+ private void handleCommitRollback(final Packet packet) throws Exception
{
ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
Journal journalToUse = getJournal(commitMessage.getJournalID());
-
-
+
if (commitMessage.isRollback())
{
journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
@@ -191,7 +184,7 @@
/**
* @param packet
*/
- private void handlePrepare(Packet packet) throws Exception
+ private void handlePrepare(final Packet packet) throws Exception
{
ReplicationPrepareMessage prepareMessage = (ReplicationPrepareMessage)packet;
@@ -203,7 +196,7 @@
/**
* @param packet
*/
- private void handleAppendDeleteTX(Packet packet) throws Exception
+ private void handleAppendDeleteTX(final Packet packet) throws Exception
{
ReplicationDeleteTXMessage deleteMessage = (ReplicationDeleteTXMessage)packet;
@@ -217,7 +210,7 @@
/**
* @param packet
*/
- private void handleAppendDelete(Packet packet) throws Exception
+ private void handleAppendDelete(final Packet packet) throws Exception
{
ReplicationDeleteMessage deleteMessage = (ReplicationDeleteMessage)packet;
@@ -229,7 +222,7 @@
/**
* @param packet
*/
- private void handleAppendAddTXRecord(Packet packet) throws Exception
+ private void handleAppendAddTXRecord(final Packet packet) throws Exception
{
ReplicationAddTXMessage addMessage = (ReplicationAddTXMessage)packet;
@@ -255,7 +248,7 @@
* @param packet
* @throws Exception
*/
- private void handleAppendAddRecord(Packet packet) throws Exception
+ private void handleAppendAddRecord(final Packet packet) throws Exception
{
ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
@@ -263,7 +256,10 @@
if (addMessage.isUpdate())
{
- System.out.println("Endpoint appendUpdate id = " + addMessage.getId());
+ if (trace)
+ {
+ System.out.println("Endpoint appendUpdate id = " + addMessage.getId());
+ }
journalToUse.appendUpdateRecord(addMessage.getId(),
addMessage.getRecordType(),
addMessage.getRecordData(),
@@ -271,7 +267,10 @@
}
else
{
- System.out.println("Endpoint append id = " + addMessage.getId());
+ if (trace)
+ {
+ System.out.println("Endpoint append id = " + addMessage.getId());
+ }
journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
}
}
@@ -280,7 +279,7 @@
* @param journalID
* @return
*/
- private Journal getJournal(byte journalID)
+ private Journal getJournal(final byte journalID)
{
Journal journalToUse;
if (journalID == (byte)0)
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2009-10-07 02:30:02 UTC (rev 8060)
@@ -126,7 +126,6 @@
sf.setBlockOnNonPersistentSend(true);
sf.setBlockOnPersistentSend(true);
- sf.setBlockOnAcknowledge(true);
ClientSession createSession = sf.createSession(true, true);
@@ -246,8 +245,6 @@
{
break;
}
-
- System.out.println("Thread " + Thread.currentThread().getName() + " received " + message.getMessageID());
// There may be some missing or duplicate messages - but the order should be correct
@@ -257,7 +254,6 @@
lastCount = count;
- System.out.println("Client ACK: " + message.getMessageID());
message.acknowledge();
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-10-07 02:30:02 UTC (rev 8060)
@@ -23,6 +23,7 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.persistence.impl.journal.BatchingIDGenerator;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.tests.util.UnitTestCase;
@@ -142,7 +143,7 @@
for (RecordInfo record : records)
{
- if (record.userRecordType == BatchingIDGenerator.ID_COUNTER_RECORD)
+ if (record.userRecordType == JournalStorageManager.ID_COUNTER_RECORD)
{
HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(record.data);
batch.loadState(record.id, buffer);
More information about the hornetq-commits
mailing list