Author: borges
Date: 2011-08-05 11:01:37 -0400 (Fri, 05 Aug 2011)
New Revision: 11142
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 _Initial_ support for replication during sync.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-05
15:00:17 UTC (rev 11141)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-05
15:01:37 UTC (rev 11142)
@@ -218,6 +218,8 @@
private final Map<SimpleString, PersistedAddressSetting>
mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString,
PersistedAddressSetting>();
+ private final boolean hasCallbackSupport;
+
public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory)
{
this(config, executorFactory, null);
@@ -304,8 +306,8 @@
{
throw new IllegalArgumentException("Unsupported journal type " +
config.getJournalType());
}
+ hasCallbackSupport = journalFF.isSupportsCallbacks();
-
idGenerator = new BatchingIDGenerator(0,
JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -3480,4 +3482,9 @@
journal.stop();
}
+ public boolean hasCallbackSupport()
+ {
+ return hasCallbackSupport;
+ }
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-05
15:00:17 UTC (rev 11141)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-05
15:01:37 UTC (rev 11142)
@@ -456,7 +456,8 @@
JournalImpl journal = assertJournalImpl(journalIf);
Map<Long, JournalFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(),
mapToFill);
- registerJournal(packet.getJournalContentType().typeByte, new
ReplicatingJournal(current));
+ registerJournal(packet.getJournalContentType().typeByte,
+ new ReplicatingJournal(current, storage.hasCallbackSupport()));
}
// XXX HORNETQ-720 really need to do away with this once the method calls get stable.
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-05
15:00:17 UTC (rev 11141)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-05
15:01:37 UTC (rev 11142)
@@ -1,7 +1,11 @@
package org.hornetq.core.journal.impl;
import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
@@ -10,6 +14,9 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
/**
* Journal used at a replicating backup server during the synchronization of data with
the 'live'
@@ -19,69 +26,120 @@
*/
public class ReplicatingJournal implements Journal
{
+ private final ReentrantLock lockAppend = new ReentrantLock();
+ private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private final JournalFile file;
+ private final boolean hasCallbackSupport;
/**
* @param file
*/
- public ReplicatingJournal(JournalFile file)
+ public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
{
this.file = file;
+ this.hasCallbackSupport = hasCallbackSupport;
}
@Override
public void start() throws Exception
{
- // TODO Auto-generated method stub
-
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void stop() throws Exception
{
- // TODO Auto-generated method stub
-
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public boolean isStarted()
{
- // TODO Auto-generated method stub
- return false;
+ throw new UnsupportedOperationException();
}
+ // ------------------------
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception
{
- throw new UnsupportedOperationException();
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback)
-
throws Exception
+ throws Exception
{
- throw new UnsupportedOperationException();
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync,
completionCallback);
}
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception
{
- throw new UnsupportedOperationException();
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendAddRecord(id, recordType, record, sync, callback);
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
}
+ // ------------------------
+
+ private void readLockJournal()
+ {
+ journalLock.readLock().lock();
+ }
+
+ private void readUnlockJournal()
+ {
+ journalLock.readLock().unlock();
+ }
+
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync,
- IOCompletion completionCallback) throws Exception
+ IOCompletion callback) throws Exception
{
- throw new UnsupportedOperationException();
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType,
record);
+ if (callback != null)
+ {
+ callback.storeLineUp();
+ }
+
+ lockAppend.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ }
+ finally
+ {
+ lockAppend.unlock();
+ }
+
}
+ /**
+ * @param addRecord
+ * @param b
+ * @param sync
+ * @param object
+ * @param callback
+ * @return
+ */
+ private JournalFile appendRecord(JournalInternalRecord addRecord, boolean b, boolean
sync, Object object,
+ IOCompletion callback)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
@Override
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
@@ -89,146 +147,145 @@
appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record,
boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record,
boolean sync,
- IOCompletion completionCallback) throws Exception
+ IOCompletion completionCallback) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecord(long id, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[]
record) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
byte[] record) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws
Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendCommitRecord(long txID, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
- public void
- appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext) throws Exception
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext)
+ throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
- public void
- appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion callback)
-
throws Exception
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion callback)
+ throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendRollbackRecord(long txID, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public JournalLoadInformation loadInternalOnly() throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
@@ -242,13 +299,13 @@
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public int getAlignment() throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
@@ -274,4 +331,17 @@
{
throw new UnsupportedOperationException();
}
+
+ private SyncIOCompletion getSyncCallback(final boolean sync)
+ {
+ if (hasCallbackSupport)
+ {
+ if (sync)
+ {
+ return new SimpleWaitIOCallback();
+ }
+ return DummyCallback.getInstance();
+ }
+ return null;
+ }
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05
15:00:17 UTC (rev 11141)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05
15:01:37 UTC (rev 11142)
@@ -72,8 +72,9 @@
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
+ handler.deliver = true;
+ sendMessages(session, producer, 1);
- handler.notifyAll();
waitForBackup(sessionFactory, 10, true);
Set<Long> liveIds = getFileIds(messageJournal);
@@ -199,7 +200,6 @@
public BackupSyncDelay(ReplicationChannelHandler handler)
{
this.handler = handler;
- // TODO Auto-generated constructor stub
}
@Override
@@ -228,6 +228,8 @@
{
private ChannelHandler handler;
+ private Packet onHold;
+ public volatile boolean deliver;
public void addSubHandler(ChannelHandler handler)
{
@@ -237,21 +239,17 @@
@Override
public void handlePacket(Packet packet)
{
- System.out.println(packet);
+ if (onHold != null && deliver)
+ {
+ handler.handlePacket(onHold);
+ }
if (packet.getType() == PacketImpl.REPLICATION_SYNC)
{
ReplicationJournalFileMessage syncMsg =
(ReplicationJournalFileMessage)packet;
if (syncMsg.isUpToDate())
{
- // Hold the message that notifies the backup that sync is done.
- try
- {
- wait();
- }
- catch (InterruptedException e)
- {
- // no-op
- }
+ onHold = packet;
+ return;
}
}
handler.handlePacket(packet);