Author: borges
Date: 2011-07-27 13:27:23 -0400 (Wed, 27 Jul 2011)
New Revision: 11055
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Some replication synchronization code (turned off as it does not work).
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -14,6 +14,7 @@
package org.hornetq.core.persistence.impl.journal;
import java.io.File;
+import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.security.AccessController;
@@ -157,6 +158,11 @@
private ReplicationManager replicator;
+ public enum JournalContent
+ {
+ MESSAGES, BINDINGS;
+ }
+
private Journal messageJournal;
private Journal bindingsJournal;
@@ -321,17 +327,76 @@
return replicator != null;
}
- public void setReplicator(ReplicationManager replicationManager)
+ /**
+ * XXX FIXME Method ignores the synchronization of LargeMessages and Paging.
+ * <p>
+ * XXX A second version improvement would be to allow new operations to be sent to the
backup,
+ * while we synchronize the existing logs.
+ * @param replicationManager
+ * @throws HornetQException
+ */
+ public void setReplicator(ReplicationManager replicationManager) throws Exception
{
assert replicationManager != null;
replicator = replicationManager;
- Journal localMessageJournal = messageJournal;
- Journal localBindingsJournal = bindingsJournal;
+
+ if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof
JournalImpl))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "journals here are not JournalImpl. You
can't set a replicator!");
+ }
+ JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+ JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+ if (false)
+ {
+ localMessageJournal.writeLock();
+ localBindingsJournal.writeLock();
+
+ JournalFile[] messageFiles = prepateJournalForCopy(localMessageJournal);
+ JournalFile[] bindingsFiles = prepateJournalForCopy(localBindingsJournal);
+ localMessageJournal.writeUnlock();
+ localBindingsJournal.writeUnlock();
+
+ sendJournalFile(messageFiles, JournalContent.MESSAGES);
+ sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+ }
+ // XXX NEED to take a global lock on the StorageManager.
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
- // XXX HORNETQ-720 obviously missing here is the synchronization step.
}
+ /**
+ * Send an entire journal file to a replicating server (a backup server that is).
+ * @param jf
+ * @param replicator2
+ * @throws IOException
+ * @throws HornetQException
+ */
+ private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws
IOException, HornetQException
+ {
+ for (JournalFile jf : journalFiles)
+ {
+ replicator.sendJournalFile(jf, type);
+ jf.setCanReclaim(true);
+ }
+ }
+
+ private JournalFile[] prepateJournalForCopy(JournalImpl journal) throws Exception
+ {
+ journal.setAutoReclaim(false);
+ /*
+ * need to check whether it is safe to proceed if compacting is running (specially
at the end
+ * of it)
+ */
+ journal.forceMoveNextFile();
+ JournalFile[] datafiles = journal.getDataFiles();
+ for (JournalFile jf : datafiles)
+ {
+ jf.setCanReclaim(false);
+ }
+ return datafiles;
+ }
+
public void waitOnOperations() throws Exception
{
if (!started)
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -0,0 +1,29 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Used to copy JournalFile data over to the backup during synchronization.
+ */
+public final class ReplicationJournalFile extends PacketImpl
+{
+
+ private byte[] data;
+ private int dataSize;
+ private JournalContent journalType;
+
+ public ReplicationJournalFile()
+ {
+ super(REPLICATION_SYNC);
+ }
+
+ public ReplicationJournalFile(int size, byte[] data, JournalContent content)
+ {
+ this();
+ this.dataSize = size;
+ this.data = data;
+ this.journalType = content;
+ }
+
+}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -13,20 +13,23 @@
package org.hornetq.core.replication;
+import java.io.IOException;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.server.HornetQComponent;
/**
* Used by the {@link JournalStorageManager} to update the replicated journal.
- *
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public interface ReplicationManager extends HornetQComponent
@@ -84,4 +87,10 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+ /**
+ * Sends the whole content of the file to be duplicated.
+ * @throws HornetQException
+ */
+ void sendJournalFile(JournalFile jf, JournalContent type) throws IOException,
HornetQException;
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication.impl;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
@@ -24,9 +26,11 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
@@ -40,6 +44,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFile;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -463,8 +468,10 @@
private class ResponseHandler implements ChannelHandler
{
- /* (non-Javadoc)
- * @see
org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ /*
+ * (non-Javadoc)
+ * @see
+ *
org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
public void handlePacket(final Packet packet)
{
@@ -496,4 +503,19 @@
}
+ @Override
+ public void sendJournalFile(JournalFile jf, JournalContent content) throws
IOException, HornetQException
+ {
+ FileInputStream file = new FileInputStream(jf.getFile().getFileName());
+ byte[] data = new byte[1 << 17]; // about 130 kB
+ while (true)
+ {
+ int bytesRead = file.read(data);
+ if (bytesRead == -1)
+ break;
+ replicatingChannel.sendBlocking(new ReplicationJournalFile(bytesRead, data,
content));
+ }
+ throw new UnsupportedOperationException();
+ }
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -2003,10 +2003,12 @@
public void addHaBackup(CoreRemotingConnection rc) throws Exception
{
if (!(storageManager instanceof JournalStorageManager))
- return;
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "unknown
implementation of JournalStorageManager!");
+ }
+
JournalStorageManager journalStorageManager =
(JournalStorageManager)storageManager;
-
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -18,11 +18,11 @@
import org.hornetq.core.server.HornetQComponent;
/**
- *
+ *
* Most methods on the journal provide a blocking version where you select the sync mode
and a non blocking mode where you pass a completion callback as a parameter.
- *
+ *
* Notice also that even on the callback methods it's possible to pass the sync mode.
That will only make sense on the NIO operations.
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -84,13 +84,13 @@
*/
void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext) throws Exception;
- /**
- *
- * <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
+ /**
+ *
+ * <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
* back to a state it could be committed. </p>
- *
+ *
* <p> transactionData allows you to store any other supporting user-data
related to the transaction</p>
- *
+ *
* @param txID
* @param transactionData - extra user data for the prepare
* @throws Exception
@@ -115,7 +115,7 @@
* This is only useful if you're using the journal but not interested on the
current data.
* Useful in situations where the journal is being replicated, copied... etc. */
JournalLoadInformation loadInternalOnly() throws Exception;
-
+
void lineUpContex(IOCompletion callback);
JournalLoadInformation load(List<RecordInfo> committedRecords,
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -51,12 +51,9 @@
void setAutoReclaim(boolean autoReclaim);
- boolean isAutoReclaim();
-
void testCompact() throws Exception;
JournalFile getCurrentFile();
-
/** This method is called automatically when a new file is opened.
* @return true if it needs to re-check due to cleanup or other factors */
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -104,16 +104,16 @@
file.delete();
}
-
+
public void copyTo(SequentialFile newFileName) throws Exception
{
log.debug("Copying " + this + " as " + newFileName);
newFileName.open();
this.open();
-
-
+
+
ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
-
+
for (;;)
{
buffer.rewind();
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -321,10 +321,10 @@
return openedFiles.size();
}
- /**
+ /**
* <p>This method will instantly return the opened file, and schedule opening
and reclaiming.</p>
* <p>In case there are no cached opened files, this method will block until the
file was opened,
- * what would happen only if the system is under heavy load by another system (like a
backup system, or a DB sharing the same box as HornetQ).</p>
+ * what would happen only if the system is under heavy load by another system (like a
backup system, or a DB sharing the same box as HornetQ).</p>
* */
public JournalFile openFile() throws InterruptedException
{
@@ -377,8 +377,8 @@
return nextFile;
}
- /**
- *
+ /**
+ *
* Open a file and place it into the openedFiles queue
* */
public void pushOpenedFile() throws Exception
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-27
17:25:49 UTC (rev 11054)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-27
17:27:23 UTC (rev 11055)
@@ -65,15 +65,11 @@
import org.hornetq.utils.DataConstants;
/**
- *
- * <p>A circular log implementation.</p
- *
- * <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
- *
+ * A circular log implementation.
+ * <p>
+ * Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
*/
public class JournalImpl implements TestableJournal, JournalRecordProvider
{
@@ -1824,7 +1820,7 @@
compactor = null;
}
- autoReclaim = previousReclaimValue;
+ setAutoReclaim(previousReclaimValue);
}
}
@@ -2352,11 +2348,6 @@
this.autoReclaim = autoReclaim;
}
- public boolean isAutoReclaim()
- {
- return autoReclaim;
- }
-
public String debug() throws Exception
{
reclaimer.scan(getDataFiles());
@@ -3266,4 +3257,13 @@
}
}
+ public void writeLock()
+ {
+ journalLock.writeLock().lock();
+ }
+
+ public void writeUnlock()
+ {
+ journalLock.writeLock().unlock();
+ }
}