Author: borges
Date: 2011-11-30 11:32:08 -0500 (Wed, 30 Nov 2011)
New Revision: 11797
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpoint.java
Removed:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
Log:
Delete the ReplicationEndPoint interface, rename 'Impl' from implementation
class.
Deleted:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -1,49 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.ChannelHandler;
-import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.impl.QuorumManager;
-
-/**
- * A ReplicationEndpoint
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface ReplicationEndpoint extends ChannelHandler, HornetQComponent
-{
-
- void setChannel(Channel channel);
-
- Channel getChannel();
-
- void compareJournalInformation(JournalLoadInformation[] journalInformation) throws
HornetQException;
-
- void registerJournal(final byte id, final Journal journal);
-
- /**
- * Sets the quorumManager used by the server in the replicationEndpoint. It is used to
inform the
- * backup server of the live's nodeID.
- * @param quorumManager
- */
- void setQuorumManager(QuorumManager quorumManager);
-
-}
Copied:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpoint.java
(from rev 11796,
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java)
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpoint.java
(rev 0)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpoint.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -0,0 +1,926 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.IOCriticalErrorListener;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.impl.FileWrapperJournal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
+import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QuorumManager;
+
+/**
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationEndpoint implements ChannelHandler, HornetQComponent
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ReplicationEndpoint.class);
+
+ // Attributes ----------------------------------------------------
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private final IOCriticalErrorListener criticalErrorListener;
+
+ private final HornetQServerImpl server;
+
+ private Channel channel;
+
+ private Journal[] journals;
+ private final JournalLoadInformation[] journalLoadInformation = new
JournalLoadInformation[2];
+
+ /** Files reserved in each journal for synchronization of existing data from the
'live' server. */
+ private final Map<JournalContent, Map<Long, JournalSyncFile>>
filesReservedForSync =
+ new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
+ private final Map<Long, LargeServerMessage> largeMessagesOnSync = new
HashMap<Long, LargeServerMessage>();
+
+ /**
+ * Used to hold the real Journals before the backup is synchronized. This field should
be
+ * {@code null} on an up-to-date server.
+ */
+ private Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
+
+ private StorageManager storage;
+
+ private PagingManager pageManager;
+
+ private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
+ new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
+ private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
+ new ConcurrentHashMap<Long, LargeServerMessage>();
+
+ // Used on tests, to simulate failures on delete pages
+ private boolean deletePages = true;
+ private boolean started;
+
+ private QuorumManager quorumManager;
+
+ // Constructors --------------------------------------------------
+ public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener
criticalErrorListener)
+ {
+ this.server = server;
+ this.criticalErrorListener = criticalErrorListener;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void registerJournal(final byte id, final Journal journal)
+ {
+ if (journals == null || id >= journals.length)
+ {
+ Journal[] oldJournals = journals;
+ journals = new Journal[id + 1];
+
+ if (oldJournals != null)
+ {
+ for (int i = 0 ; i < oldJournals.length; i++)
+ {
+ journals[i] = oldJournals[i];
+ }
+ }
+ }
+
+ journals[id] = journal;
+ }
+
+ @Override
+ public void handlePacket(final Packet packet)
+ {
+ PacketImpl response = new ReplicationResponseMessage();
+ final byte type=packet.getType();
+
+ try
+ {
+ if (type == PacketImpl.REPLICATION_APPEND)
+ {
+ handleAppendAddRecord((ReplicationAddMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_APPEND_TX)
+ {
+ handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE)
+ {
+ handleAppendDelete((ReplicationDeleteMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE_TX)
+ {
+ handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PREPARE)
+ {
+ handlePrepare((ReplicationPrepareMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ {
+ handleCommitRollback((ReplicationCommitMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
+ {
+ handlePageWrite((ReplicationPageWriteMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
+ {
+ handlePageEvent((ReplicationPageEventMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
+ {
+ handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
+ {
+ handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
+ {
+ handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
+ {
+ handleCompareDataMessage((ReplicationCompareDataMessage)packet);
+ response = new NullResponseMessage();
+ }
+ else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
+ {
+ handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_SYNC_FILE)
+ {
+ handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
+ }
+ else
+ {
+ log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
+ }
+ }
+ catch (HornetQException e)
+ {
+ log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage(e);
+ }
+ catch (Exception e)
+ {
+ ReplicationEndpoint.log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage((HornetQException)e);
+ }
+
+ channel.send(response);
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ Configuration config = server.getConfiguration();
+ try
+ {
+ storage = server.getStorageManager();
+ storage.start();
+
+ server.getManagementService().setStorageManager(storage);
+
+ journalsHolder.put(JournalContent.BINDINGS, storage.getBindingsJournal());
+ journalsHolder.put(JournalContent.MESSAGES, storage.getMessageJournal());
+
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ {
+ filesReservedForSync.put(jc, new HashMap<Long, JournalSyncFile>());
+ // We only need to load internal structures on the backup...
+ journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
+ }
+
+ pageManager = new PagingManagerImpl(new
PagingStoreFactoryNIO(config.getPagingDirectory(),
+
config.getJournalBufferSize_NIO(),
+
server.getScheduledPool(),
+
server.getExecutorFactory(),
+
config.isJournalSyncNonTransactional(), criticalErrorListener),
+ storage,
+ server.getAddressSettingsRepository());
+
+ pageManager.start();
+
+ started = true;
+ }
+ catch (Exception e)
+ {
+ if (!server.isStopped())
+ throw e;
+ }
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ // This could be null if the backup server is being
+ // shut down without any live server connecting here
+ if (channel != null)
+ {
+ channel.close();
+ }
+
+ for (ConcurrentMap<Integer, Page> map : pageIndex.values())
+ {
+ for (Page page : map.values())
+ {
+ try
+ {
+ page.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error while closing the page on backup", e);
+ }
+ }
+ }
+
+ pageIndex.clear();
+
+ for (LargeServerMessage largeMessage : largeMessages.values())
+ {
+ largeMessage.releaseResources();
+ }
+ largeMessages.clear();
+
+ for (LargeServerMessage largeMessage : largeMessagesOnSync.values())
+ {
+ largeMessage.releaseResources();
+ }
+ largeMessagesOnSync.clear();
+
+ for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync.entrySet())
+ {
+ for (JournalSyncFile filesReserved : entry.getValue().values())
+ {
+ filesReserved.close();
+ }
+ }
+
+ filesReservedForSync.clear();
+ if (journals != null)
+ {
+ for (Journal j : journals)
+ {
+ if (j instanceof FileWrapperJournal)
+ j.stop();
+ }
+ }
+
+ pageManager.stop();
+
+ // Storage needs to be the last to stop
+ storage.stop();
+
+ started = false;
+ }
+
+
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
+ public void setChannel(final Channel channel)
+ {
+ this.channel = channel;
+ }
+
+ public void compareJournalInformation(final JournalLoadInformation[]
journalInformation) throws HornetQException
+ {
+ if (!server.isRemoteBackupUpToDate())
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot compare
journals if not in sync!");
+ }
+
+ if (journalLoadInformation == null || journalLoadInformation.length !=
journalInformation.length)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "Live Node contains more journals than the
backup node. Probably a version match error");
+ }
+
+ for (int i = 0; i < journalInformation.length; i++)
+ {
+ if (!journalInformation[i].equals(journalLoadInformation[i]))
+ {
+ log.warn("Journal comparison mismatch:\n" +
journalParametersToString(journalInformation));
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Backup node can't connect to the live
node as the data differs");
+ }
+ }
+
+ }
+
+ /** Used on tests only. To simulate missing page deletes*/
+ public void setDeletePages(final boolean deletePages)
+ {
+ this.deletePages = deletePages;
+ }
+
+ /**
+ * @param journalInformation
+ */
+ private String journalParametersToString(final JournalLoadInformation[]
journalInformation)
+ {
+ return "**********************************************************\n" +
"parameters:\n" +
+ "Bindings = " +
+ journalInformation[0] +
+ "\n" +
+ "Messaging = " +
+ journalInformation[1] +
+ "\n" +
+ "**********************************************************" +
+ "\n" +
+ "Expected:" +
+ "\n" +
+ "Bindings = " +
+ journalLoadInformation[0] +
+ "\n" +
+ "Messaging = " +
+ journalLoadInformation[1] +
+ "\n" +
+ "**********************************************************";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void finishSynchronization(String liveID) throws Exception
+ {
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ {
+ JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
+ journal.synchronizationLock();
+ try
+ {
+ if (journal.getDataFiles().length != 0)
+ {
+ throw new IllegalStateException("Journal should not have any data
files at this point");
+ }
+ // files should be already in place.
+ filesReservedForSync.remove(jc);
+ registerJournal(jc.typeByte, journal);
+ journal.stop();
+ journal.start();
+ journal.loadInternalOnly();
+ }
+ finally
+ {
+ journal.synchronizationUnlock();
+ }
+ }
+ synchronized (largeMessagesOnSync)
+ {
+ synchronized (largeMessages)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+ for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
+ {
+ Long id = entry.getKey();
+ LargeServerMessage lm = entry.getValue();
+ if (largeMessagesOnSync.containsKey(id))
+ {
+ SequentialFile sq = lm.getFile();
+ LargeServerMessage mainLM = largeMessagesOnSync.get(id);
+ SequentialFile mainSeqFile = mainLM.getFile();
+ for (;;)
+ {
+ buffer.rewind();
+ int size = sq.read(buffer);
+ mainSeqFile.writeInternal(buffer);
+ if (size < buffer.capacity())
+ {
+ break;
+ }
+ }
+ }
+ else
+ {
+ // these are large-messages created after sync started
+ largeMessagesOnSync.put(id, lm);
+ }
+ }
+ largeMessages.clear();
+ largeMessages.putAll(largeMessagesOnSync);
+ largeMessagesOnSync.clear();
+ }
+ }
+ journalsHolder = null;
+ quorumManager.setLiveID(liveID);
+ server.setRemoteBackupUpToDate(liveID);
+ log.info("Backup server " + server + " is synchronized with
live-server.");
+ return;
+ }
+
+ private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws
Exception
+ {
+ Long id = Long.valueOf(msg.getId());
+ byte[] data = msg.getData();
+ SequentialFile channel;
+ switch (msg.getFileType())
+ {
+ case LARGE_MESSAGE:
+ {
+ synchronized (largeMessagesOnSync)
+ {
+ LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+ if (largeMessage == null)
+ {
+ largeMessage = storage.createLargeMessage();
+ largeMessage.setDurable(true);
+ largeMessage.setMessageID(id);
+ largeMessagesOnSync.put(id, largeMessage);
+ }
+ channel = largeMessage.getFile();
+ }
+ break;
+ }
+ case PAGE:
+ {
+ Page page = getPage(msg.getPageStore(), (int)msg.getId());
+
+ channel = page.getFile();
+ break;
+ }
+ case JOURNAL:
+ {
+ JournalSyncFile journalSyncFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
+ FileChannel channel2 = journalSyncFile.getChannel();
+ if (data == null)
+ {
+ channel2.close();
+ return;
+ }
+ channel2.write(ByteBuffer.wrap(data));
+ return;
+ }
+ default:
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled
file type " + msg.getFileType());
+ }
+
+ if (data == null)
+ {
+ channel.close();
+ return;
+ }
+
+ if (!channel.isOpen())
+ {
+ channel.open(1, false);
+ }
+ channel.writeDirect(ByteBuffer.wrap(data), true);
+ }
+
+ /**
+ * Reserves files (with the given fileID) in the specified journal, and places a
+ * {@link FileWrapperJournal} in place to store messages while synchronization is
going on.
+ * @param packet
+ * @throws Exception
+ */
+ private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage
packet) throws Exception
+ {
+ if (server.isRemoteBackupUpToDate())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
+ }
+
+ if (packet.isSynchronizationFinished())
+ {
+ finishSynchronization(packet.getNodeID());
+ return;
+ }
+
+ final Journal journal = journalsHolder.get(packet.getJournalContentType());
+ synchronized (this)
+ {
+ if (!started)
+ return;
+ if (packet.getNodeID() != null)
+ {
+ quorumManager.setLiveID(packet.getNodeID());
+ }
+ Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
+ log.info("Journal " + packet.getJournalContentType() + ".
Reserving fileIDs for synchronization: " +
+ Arrays.toString(packet.getFileIds()));
+
+ for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
+ {
+ mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+ }
+ FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
+ registerJournal(packet.getJournalContentType().typeByte, syncJournal);
+ }
+ }
+
+ private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
+
+ if (message != null)
+ {
+ try
+ {
+ message.deleteFile();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error deleting large message ID = " +
packet.getMessageId(), e);
+ }
+ }
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet)
throws Exception
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
+ if (message != null)
+ {
+ message.addBytes(packet.getBody());
+ }
+ }
+
+ /**
+ * @param request
+ */
+ private void handleCompareDataMessage(final ReplicationCompareDataMessage request)
throws HornetQException
+ {
+ compareJournalInformation(request.getJournalInformation());
+ }
+
+ private LargeServerMessage lookupLargeMessage(final long messageId, final boolean
delete)
+ {
+ LargeServerMessage message;
+
+ if (delete)
+ {
+ message = largeMessages.remove(messageId);
+ }
+ else
+ {
+ message = largeMessages.get(messageId);
+ if (message == null)
+ {
+ synchronized (largeMessages)
+ {
+ if (!server.isRemoteBackupUpToDate())
+ {
+ // in case we need to append data to a file while still sync'ing
the backup
+ createLargeMessage(messageId, true);
+ message = largeMessages.get(messageId);
+ }
+ }
+ }
+ }
+
+ if (message == null)
+ {
+ log.warn("Large MessageID " + messageId +
+ " is not available on backup server.
Ignoring replication message");
+ }
+
+ return message;
+
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageBegin(final ReplicationLargeMessageBeingMessage
packet)
+ {
+ final long id = packet.getMessageId();
+ createLargeMessage(id, false);
+ log.trace("Receiving Large Message " + id + " on backup");
+ }
+
+ private void createLargeMessage(final long id, boolean sync)
+ {
+ LargeServerMessage msg = storage.createLargeMessage();
+ msg.setDurable(true);
+ msg.setMessageID(id);
+ msg.setReplicationSync(sync);
+ largeMessages.put(id, msg);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleCommitRollback(final ReplicationCommitMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ if (packet.isRollback())
+ {
+ journalToUse.appendRollbackRecord(packet.getTxId(), packet.getSync());
+ }
+ else
+ {
+ journalToUse.appendCommitRecord(packet.getTxId(), packet.getSync());
+ }
+ }
+
+ /**
+ * @param packet
+ */
+ private void handlePrepare(final ReplicationPrepareMessage packet) throws Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ journalToUse.appendPrepareRecord(packet.getTxId(), packet.getRecordData(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDeleteTX(final ReplicationDeleteTXMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ journalToUse.appendDeleteRecordTransactional(packet.getTxId(), packet.getId(),
packet.getRecordData());
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDelete(final ReplicationDeleteMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ journalToUse.appendDeleteRecord(packet.getId(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendAddTXRecord(final ReplicationAddTXMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ if (packet.isUpdate())
+ {
+ journalToUse.appendUpdateRecordTransactional(packet.getTxId(),
+ packet.getId(),
+ packet.getRecordType(),
+ packet.getRecordData());
+ }
+ else
+ {
+ journalToUse.appendAddRecordTransactional(packet.getTxId(),
+ packet.getId(),
+ packet.getRecordType(),
+ packet.getRecordData());
+ }
+ }
+
+ /**
+ * @param packet
+ * @throws Exception
+ */
+ private void handleAppendAddRecord(final ReplicationAddMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ if (packet.isUpdate())
+ {
+ if (ReplicationEndpoint.trace)
+ {
+ log.trace("Endpoint appendUpdate id = " + packet.getId());
+ }
+ journalToUse.appendUpdateRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
+ }
+ else
+ {
+ if (ReplicationEndpoint.trace)
+ {
+ log.trace("Endpoint append id = " + packet.getId());
+ }
+ journalToUse.appendAddRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
+ }
+ }
+
+ /**
+ * @param packet
+ */
+ private void handlePageEvent(final ReplicationPageEventMessage packet) throws
Exception
+ {
+ ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
+
+ Page page = pages.remove(packet.getPageNumber());
+
+ if (page == null)
+ {
+ page = getPage(packet.getStoreName(), packet.getPageNumber());
+ }
+
+ if (page != null)
+ {
+ if (packet.isDelete())
+ {
+ if (deletePages)
+ {
+ page.delete(null);
+ }
+ }
+ else
+ {
+ page.close();
+ }
+ }
+
+ }
+
+ /**
+ * @param packet
+ */
+ private void handlePageWrite(final ReplicationPageWriteMessage packet) throws
Exception
+ {
+ PagedMessage pgdMessage = packet.getPagedMessage();
+ pgdMessage.initMessage(storage);
+ ServerMessage msg = pgdMessage.getMessage();
+ Page page = getPage(msg.getAddress(), packet.getPageNumber());
+ page.write(pgdMessage);
+ }
+
+ private ConcurrentMap<Integer, Page> getPageMap(final SimpleString storeName)
+ {
+ ConcurrentMap<Integer, Page> resultIndex = pageIndex.get(storeName);
+
+ if (resultIndex == null)
+ {
+ resultIndex = new ConcurrentHashMap<Integer, Page>();
+ ConcurrentMap<Integer, Page> mapResult = pageIndex.putIfAbsent(storeName,
resultIndex);
+ if (mapResult != null)
+ {
+ resultIndex = mapResult;
+ }
+ }
+
+ return resultIndex;
+ }
+
+ private Page getPage(final SimpleString storeName, final int pageId) throws Exception
+ {
+ ConcurrentMap<Integer, Page> map = getPageMap(storeName);
+
+ Page page = map.get(pageId);
+
+ if (page == null)
+ {
+ page = newPage(pageId, storeName, map);
+ }
+
+ return page;
+ }
+
+ /**
+ * @param pageId
+ * @param map
+ * @return
+ */
+ private synchronized Page newPage(final int pageId,
+ final SimpleString storeName,
+ final ConcurrentMap<Integer, Page> map) throws
Exception
+ {
+ Page page = map.get(pageId);
+
+ if (page == null)
+ {
+ page = pageManager.getPageStore(storeName).createPage(pageId);
+ page.open();
+ map.put(pageId, page);
+ }
+
+ return page;
+ }
+
+ /**
+ * @param journalID
+ * @return
+ */
+ private Journal getJournal(final byte journalID)
+ {
+ return journals[journalID];
+ }
+
+ public static class JournalSyncFile
+ {
+
+ private FileChannel channel;
+ private final File file;
+
+ public JournalSyncFile(JournalFile jFile) throws Exception
+ {
+ SequentialFile seqFile = jFile.getFile();
+ file = seqFile.getJavaFile();
+ seqFile.close();
+ }
+
+ FileChannel getChannel() throws Exception
+ {
+ if (channel == null)
+ {
+ channel = new FileOutputStream(file).getChannel();
+ }
+ return channel;
+ }
+
+ void close() throws IOException
+ {
+ if (channel != null)
+ channel.close();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JournalSyncFile(file=" + file.getAbsolutePath() +
")";
+ }
+ }
+
+ /**
+ * Sets the quorumManager used by the server in the replicationEndpoint. It is used to
inform the
+ * backup server of the live's nodeID.
+ * @param quorumManager
+ */
+ public void setQuorumManager(QuorumManager quorumManager)
+ {
+ this.quorumManager = quorumManager;
+ }
+}
Deleted:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -1,923 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.journal.IOCriticalErrorListener;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.impl.FileWrapperJournal;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.Page;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.impl.PagingManagerImpl;
-import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
-import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.server.impl.QuorumManager;
-
-/**
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationEndpointImpl implements ReplicationEndpoint
-{
-
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ReplicationEndpointImpl.class);
-
- // Attributes ----------------------------------------------------
-
- private static final boolean trace = log.isTraceEnabled();
-
- private final IOCriticalErrorListener criticalErrorListener;
-
- private final HornetQServerImpl server;
-
- private Channel channel;
-
- private Journal[] journals;
- private final JournalLoadInformation[] journalLoadInformation = new
JournalLoadInformation[2];
-
- /** Files reserved in each journal for synchronization of existing data from the
'live' server. */
- private final Map<JournalContent, Map<Long, JournalSyncFile>>
filesReservedForSync =
- new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
- private final Map<Long, LargeServerMessage> largeMessagesOnSync = new
HashMap<Long, LargeServerMessage>();
-
- /**
- * Used to hold the real Journals before the backup is synchronized. This field should
be
- * {@code null} on an up-to-date server.
- */
- private Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
-
- private StorageManager storage;
-
- private PagingManager pageManager;
-
- private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
- new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
- private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
- new ConcurrentHashMap<Long, LargeServerMessage>();
-
- // Used on tests, to simulate failures on delete pages
- private boolean deletePages = true;
- private boolean started;
-
- private QuorumManager quorumManager;
-
- // Constructors --------------------------------------------------
- public ReplicationEndpointImpl(final HornetQServerImpl server, IOCriticalErrorListener
criticalErrorListener)
- {
- this.server = server;
- this.criticalErrorListener = criticalErrorListener;
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public void registerJournal(final byte id, final Journal journal)
- {
- if (journals == null || id >= journals.length)
- {
- Journal[] oldJournals = journals;
- journals = new Journal[id + 1];
-
- if (oldJournals != null)
- {
- for (int i = 0 ; i < oldJournals.length; i++)
- {
- journals[i] = oldJournals[i];
- }
- }
- }
-
- journals[id] = journal;
- }
-
- @Override
- public void handlePacket(final Packet packet)
- {
- PacketImpl response = new ReplicationResponseMessage();
- final byte type=packet.getType();
-
- try
- {
- if (type == PacketImpl.REPLICATION_APPEND)
- {
- handleAppendAddRecord((ReplicationAddMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_APPEND_TX)
- {
- handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE)
- {
- handleAppendDelete((ReplicationDeleteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE_TX)
- {
- handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PREPARE)
- {
- handlePrepare((ReplicationPrepareMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
- {
- handleCommitRollback((ReplicationCommitMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
- {
- handlePageWrite((ReplicationPageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
- {
- handlePageEvent((ReplicationPageEventMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
- {
- handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
- {
- handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
- {
- handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
- {
- handleCompareDataMessage((ReplicationCompareDataMessage)packet);
- response = new NullResponseMessage();
- }
- else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
- {
- handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_SYNC_FILE)
- {
- handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
- }
- else
- {
- log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
- }
- }
- catch (HornetQException e)
- {
- log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage(e);
- }
- catch (Exception e)
- {
- ReplicationEndpointImpl.log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage((HornetQException)e);
- }
-
- channel.send(response);
- }
-
- public boolean isStarted()
- {
- return started;
- }
-
- public synchronized void start() throws Exception
- {
- Configuration config = server.getConfiguration();
- try
- {
- storage = server.getStorageManager();
- storage.start();
-
- server.getManagementService().setStorageManager(storage);
-
- journalsHolder.put(JournalContent.BINDINGS, storage.getBindingsJournal());
- journalsHolder.put(JournalContent.MESSAGES, storage.getMessageJournal());
-
- for (JournalContent jc : EnumSet.allOf(JournalContent.class))
- {
- filesReservedForSync.put(jc, new HashMap<Long, JournalSyncFile>());
- // We only need to load internal structures on the backup...
- journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
- }
-
- pageManager = new PagingManagerImpl(new
PagingStoreFactoryNIO(config.getPagingDirectory(),
-
config.getJournalBufferSize_NIO(),
-
server.getScheduledPool(),
-
server.getExecutorFactory(),
-
config.isJournalSyncNonTransactional(), criticalErrorListener),
- storage,
- server.getAddressSettingsRepository());
-
- pageManager.start();
-
- started = true;
- }
- catch (Exception e)
- {
- if (!server.isStopped())
- throw e;
- }
- }
-
- public synchronized void stop() throws Exception
- {
- if (!started)
- {
- return;
- }
-
- // This could be null if the backup server is being
- // shut down without any live server connecting here
- if (channel != null)
- {
- channel.close();
- }
-
- for (ConcurrentMap<Integer, Page> map : pageIndex.values())
- {
- for (Page page : map.values())
- {
- try
- {
- page.close();
- }
- catch (Exception e)
- {
- log.warn("Error while closing the page on backup", e);
- }
- }
- }
-
- pageIndex.clear();
-
- for (LargeServerMessage largeMessage : largeMessages.values())
- {
- largeMessage.releaseResources();
- }
- largeMessages.clear();
-
- for (LargeServerMessage largeMessage : largeMessagesOnSync.values())
- {
- largeMessage.releaseResources();
- }
- largeMessagesOnSync.clear();
-
- for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync.entrySet())
- {
- for (JournalSyncFile filesReserved : entry.getValue().values())
- {
- filesReserved.close();
- }
- }
-
- filesReservedForSync.clear();
- if (journals != null)
- {
- for (Journal j : journals)
- {
- if (j instanceof FileWrapperJournal)
- j.stop();
- }
- }
-
- pageManager.stop();
-
- // Storage needs to be the last to stop
- storage.stop();
-
- started = false;
- }
-
- @Override
- public Channel getChannel()
- {
- return channel;
- }
-
- @Override
- public void setChannel(final Channel channel)
- {
- this.channel = channel;
- }
-
- public void compareJournalInformation(final JournalLoadInformation[]
journalInformation) throws HornetQException
- {
- if (!server.isRemoteBackupUpToDate())
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot compare
journals if not in sync!");
- }
-
- if (journalLoadInformation == null || journalLoadInformation.length !=
journalInformation.length)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
- "Live Node contains more journals than the
backup node. Probably a version match error");
- }
-
- for (int i = 0; i < journalInformation.length; i++)
- {
- if (!journalInformation[i].equals(journalLoadInformation[i]))
- {
- log.warn("Journal comparison mismatch:\n" +
journalParametersToString(journalInformation));
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Backup node can't connect to the live
node as the data differs");
- }
- }
-
- }
-
- /** Used on tests only. To simulate missing page deletes*/
- public void setDeletePages(final boolean deletePages)
- {
- this.deletePages = deletePages;
- }
-
- /**
- * @param journalInformation
- */
- private String journalParametersToString(final JournalLoadInformation[]
journalInformation)
- {
- return "**********************************************************\n" +
"parameters:\n" +
- "Bindings = " +
- journalInformation[0] +
- "\n" +
- "Messaging = " +
- journalInformation[1] +
- "\n" +
- "**********************************************************" +
- "\n" +
- "Expected:" +
- "\n" +
- "Bindings = " +
- journalLoadInformation[0] +
- "\n" +
- "Messaging = " +
- journalLoadInformation[1] +
- "\n" +
- "**********************************************************";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void finishSynchronization(String liveID) throws Exception
- {
- for (JournalContent jc : EnumSet.allOf(JournalContent.class))
- {
- JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
- journal.synchronizationLock();
- try
- {
- if (journal.getDataFiles().length != 0)
- {
- throw new IllegalStateException("Journal should not have any data
files at this point");
- }
- // files should be already in place.
- filesReservedForSync.remove(jc);
- registerJournal(jc.typeByte, journal);
- journal.stop();
- journal.start();
- journal.loadInternalOnly();
- }
- finally
- {
- journal.synchronizationUnlock();
- }
- }
- synchronized (largeMessagesOnSync)
- {
- synchronized (largeMessages)
- {
- ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
- for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
- {
- Long id = entry.getKey();
- LargeServerMessage lm = entry.getValue();
- if (largeMessagesOnSync.containsKey(id))
- {
- SequentialFile sq = lm.getFile();
- LargeServerMessage mainLM = largeMessagesOnSync.get(id);
- SequentialFile mainSeqFile = mainLM.getFile();
- for (;;)
- {
- buffer.rewind();
- int size = sq.read(buffer);
- mainSeqFile.writeInternal(buffer);
- if (size < buffer.capacity())
- {
- break;
- }
- }
- }
- else
- {
- // these are large-messages created after sync started
- largeMessagesOnSync.put(id, lm);
- }
- }
- largeMessages.clear();
- largeMessages.putAll(largeMessagesOnSync);
- largeMessagesOnSync.clear();
- }
- }
- journalsHolder = null;
- quorumManager.setLiveID(liveID);
- server.setRemoteBackupUpToDate(liveID);
- log.info("Backup server " + server + " is synchronized with
live-server.");
- return;
- }
-
- private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws
Exception
- {
- Long id = Long.valueOf(msg.getId());
- byte[] data = msg.getData();
- SequentialFile channel;
- switch (msg.getFileType())
- {
- case LARGE_MESSAGE:
- {
- synchronized (largeMessagesOnSync)
- {
- LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
- if (largeMessage == null)
- {
- largeMessage = storage.createLargeMessage();
- largeMessage.setDurable(true);
- largeMessage.setMessageID(id);
- largeMessagesOnSync.put(id, largeMessage);
- }
- channel = largeMessage.getFile();
- }
- break;
- }
- case PAGE:
- {
- Page page = getPage(msg.getPageStore(), (int)msg.getId());
-
- channel = page.getFile();
- break;
- }
- case JOURNAL:
- {
- JournalSyncFile journalSyncFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
- FileChannel channel2 = journalSyncFile.getChannel();
- if (data == null)
- {
- channel2.close();
- return;
- }
- channel2.write(ByteBuffer.wrap(data));
- return;
- }
- default:
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled
file type " + msg.getFileType());
- }
-
- if (data == null)
- {
- channel.close();
- return;
- }
-
- if (!channel.isOpen())
- {
- channel.open(1, false);
- }
- channel.writeDirect(ByteBuffer.wrap(data), true);
- }
-
- /**
- * Reserves files (with the given fileID) in the specified journal, and places a
- * {@link FileWrapperJournal} in place to store messages while synchronization is
going on.
- * @param packet
- * @throws Exception
- */
- private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage
packet) throws Exception
- {
- if (server.isRemoteBackupUpToDate())
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
- }
-
- if (packet.isSynchronizationFinished())
- {
- finishSynchronization(packet.getNodeID());
- return;
- }
-
- final Journal journal = journalsHolder.get(packet.getJournalContentType());
- synchronized (this)
- {
- if (!started)
- return;
- if (packet.getNodeID() != null)
- {
- quorumManager.setLiveID(packet.getNodeID());
- }
- Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
- log.info("Journal " + packet.getJournalContentType() + ".
Reserving fileIDs for synchronization: " +
- Arrays.toString(packet.getFileIds()));
-
- for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
- {
- mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
- }
- FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
- registerJournal(packet.getJournalContentType().typeByte, syncJournal);
- }
- }
-
- private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
- {
- LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
-
- if (message != null)
- {
- try
- {
- message.deleteFile();
- }
- catch (Exception e)
- {
- log.warn("Error deleting large message ID = " +
packet.getMessageId(), e);
- }
- }
- }
-
- /**
- * @param packet
- */
- private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet)
throws Exception
- {
- LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
- if (message != null)
- {
- message.addBytes(packet.getBody());
- }
- }
-
- /**
- * @param request
- */
- private void handleCompareDataMessage(final ReplicationCompareDataMessage request)
throws HornetQException
- {
- compareJournalInformation(request.getJournalInformation());
- }
-
- private LargeServerMessage lookupLargeMessage(final long messageId, final boolean
delete)
- {
- LargeServerMessage message;
-
- if (delete)
- {
- message = largeMessages.remove(messageId);
- }
- else
- {
- message = largeMessages.get(messageId);
- if (message == null)
- {
- synchronized (largeMessages)
- {
- if (!server.isRemoteBackupUpToDate())
- {
- // in case we need to append data to a file while still sync'ing
the backup
- createLargeMessage(messageId, true);
- message = largeMessages.get(messageId);
- }
- }
- }
- }
-
- if (message == null)
- {
- log.warn("Large MessageID " + messageId +
- " is not available on backup server.
Ignoring replication message");
- }
-
- return message;
-
- }
-
- /**
- * @param packet
- */
- private void handleLargeMessageBegin(final ReplicationLargeMessageBeingMessage
packet)
- {
- final long id = packet.getMessageId();
- createLargeMessage(id, false);
- log.trace("Receiving Large Message " + id + " on backup");
- }
-
- private void createLargeMessage(final long id, boolean sync)
- {
- LargeServerMessage msg = storage.createLargeMessage();
- msg.setDurable(true);
- msg.setMessageID(id);
- msg.setReplicationSync(sync);
- largeMessages.put(id, msg);
- }
-
- /**
- * @param packet
- */
- private void handleCommitRollback(final ReplicationCommitMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- if (packet.isRollback())
- {
- journalToUse.appendRollbackRecord(packet.getTxId(), packet.getSync());
- }
- else
- {
- journalToUse.appendCommitRecord(packet.getTxId(), packet.getSync());
- }
- }
-
- /**
- * @param packet
- */
- private void handlePrepare(final ReplicationPrepareMessage packet) throws Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- journalToUse.appendPrepareRecord(packet.getTxId(), packet.getRecordData(), false);
- }
-
- /**
- * @param packet
- */
- private void handleAppendDeleteTX(final ReplicationDeleteTXMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- journalToUse.appendDeleteRecordTransactional(packet.getTxId(), packet.getId(),
packet.getRecordData());
- }
-
- /**
- * @param packet
- */
- private void handleAppendDelete(final ReplicationDeleteMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- journalToUse.appendDeleteRecord(packet.getId(), false);
- }
-
- /**
- * @param packet
- */
- private void handleAppendAddTXRecord(final ReplicationAddTXMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- if (packet.isUpdate())
- {
- journalToUse.appendUpdateRecordTransactional(packet.getTxId(),
- packet.getId(),
- packet.getRecordType(),
- packet.getRecordData());
- }
- else
- {
- journalToUse.appendAddRecordTransactional(packet.getTxId(),
- packet.getId(),
- packet.getRecordType(),
- packet.getRecordData());
- }
- }
-
- /**
- * @param packet
- * @throws Exception
- */
- private void handleAppendAddRecord(final ReplicationAddMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- if (packet.isUpdate())
- {
- if (ReplicationEndpointImpl.trace)
- {
- log.trace("Endpoint appendUpdate id = " + packet.getId());
- }
- journalToUse.appendUpdateRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
- }
- else
- {
- if (ReplicationEndpointImpl.trace)
- {
- log.trace("Endpoint append id = " + packet.getId());
- }
- journalToUse.appendAddRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
- }
- }
-
- /**
- * @param packet
- */
- private void handlePageEvent(final ReplicationPageEventMessage packet) throws
Exception
- {
- ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
-
- Page page = pages.remove(packet.getPageNumber());
-
- if (page == null)
- {
- page = getPage(packet.getStoreName(), packet.getPageNumber());
- }
-
- if (page != null)
- {
- if (packet.isDelete())
- {
- if (deletePages)
- {
- page.delete(null);
- }
- }
- else
- {
- page.close();
- }
- }
-
- }
-
- /**
- * @param packet
- */
- private void handlePageWrite(final ReplicationPageWriteMessage packet) throws
Exception
- {
- PagedMessage pgdMessage = packet.getPagedMessage();
- pgdMessage.initMessage(storage);
- ServerMessage msg = pgdMessage.getMessage();
- Page page = getPage(msg.getAddress(), packet.getPageNumber());
- page.write(pgdMessage);
- }
-
- private ConcurrentMap<Integer, Page> getPageMap(final SimpleString storeName)
- {
- ConcurrentMap<Integer, Page> resultIndex = pageIndex.get(storeName);
-
- if (resultIndex == null)
- {
- resultIndex = new ConcurrentHashMap<Integer, Page>();
- ConcurrentMap<Integer, Page> mapResult = pageIndex.putIfAbsent(storeName,
resultIndex);
- if (mapResult != null)
- {
- resultIndex = mapResult;
- }
- }
-
- return resultIndex;
- }
-
- private Page getPage(final SimpleString storeName, final int pageId) throws Exception
- {
- ConcurrentMap<Integer, Page> map = getPageMap(storeName);
-
- Page page = map.get(pageId);
-
- if (page == null)
- {
- page = newPage(pageId, storeName, map);
- }
-
- return page;
- }
-
- /**
- * @param pageId
- * @param map
- * @return
- */
- private synchronized Page newPage(final int pageId,
- final SimpleString storeName,
- final ConcurrentMap<Integer, Page> map) throws
Exception
- {
- Page page = map.get(pageId);
-
- if (page == null)
- {
- page = pageManager.getPageStore(storeName).createPage(pageId);
- page.open();
- map.put(pageId, page);
- }
-
- return page;
- }
-
- /**
- * @param journalID
- * @return
- */
- private Journal getJournal(final byte journalID)
- {
- return journals[journalID];
- }
-
- public static class JournalSyncFile
- {
-
- private FileChannel channel;
- private final File file;
-
- public JournalSyncFile(JournalFile jFile) throws Exception
- {
- SequentialFile seqFile = jFile.getFile();
- file = seqFile.getJavaFile();
- seqFile.close();
- }
-
- FileChannel getChannel() throws Exception
- {
- if (channel == null)
- {
- channel = new FileOutputStream(file).getChannel();
- }
- return channel;
- }
-
- void close() throws IOException
- {
- if (channel != null)
- channel.close();
- }
-
- @Override
- public String toString()
- {
- return "JournalSyncFile(file=" + file.getAbsolutePath() +
")";
- }
- }
-
- @Override
- public void setQuorumManager(QuorumManager quorumManager)
- {
- this.quorumManager = quorumManager;
- }
-}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-30
16:31:32 UTC (rev 11796)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -30,8 +30,8 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
-import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicationEndpoint;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -91,9 +91,8 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
-import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
+import org.hornetq.core.replication.impl.ReplicationEndpoint;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
@@ -402,7 +401,7 @@
{
assert replicationEndpoint == null;
backupUpToDate = false;
- replicationEndpoint = new ReplicationEndpointImpl(this,
shutdownOnCriticalIO);
+ replicationEndpoint = new ReplicationEndpoint(this,
shutdownOnCriticalIO);
activation = new SharedNothingBackupActivation();
}
Modified:
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
---
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -15,7 +15,7 @@
import java.util.List;
-import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.replication.impl.ReplicationEndpoint;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.jms.persistence.config.PersistedConnectionFactory;
import org.hornetq.jms.persistence.config.PersistedDestination;
@@ -41,21 +41,21 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
void load() throws Exception;
-
+
void storeDestination(PersistedDestination destination) throws Exception;
void deleteDestination(PersistedType type, String name) throws Exception;
-
+
List<PersistedDestination> recoverDestinations();
-
+
void deleteConnectionFactory(String connectionFactory) throws Exception;
-
+
void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws
Exception;
-
+
List<PersistedConnectionFactory> recoverConnectionFactories();
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -65,17 +65,17 @@
// Inner classes -------------------------------------------------
void addJNDI(PersistedType type, String name, String ... address) throws Exception;
-
+
List<PersistedJNDI> recoverPersistedJNDI() throws Exception;
-
+
void deleteJNDI(PersistedType type, String name, String address) throws Exception;
-
+
void deleteJNDI(PersistedType type, String name) throws Exception;
/**
* Add the journal here to the replication endpoint
* @param replicationEndpoint
- * @throws Exception
+ * @throws Exception
*/
void installReplication(ReplicationEndpoint replicationEndpoint) throws Exception;
}
Modified:
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
===================================================================
---
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -29,9 +29,9 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.impl.ReplicatedJournal;
+import org.hornetq.core.replication.impl.ReplicationEndpoint;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.config.PersistedConnectionFactory;
Modified:
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
---
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -16,7 +16,7 @@
import java.util.Collections;
import java.util.List;
-import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.replication.impl.ReplicationEndpoint;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.config.PersistedConnectionFactory;
import org.hornetq.jms.persistence.config.PersistedDestination;
@@ -38,7 +38,7 @@
*/
public void deleteConnectionFactory(String connectionFactory) throws Exception
{
-
+
}
/* (non-Javadoc)
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -15,7 +15,7 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.replication.impl.ReplicationEndpoint;
import org.hornetq.spi.core.protocol.RemotingConnection;
/**
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-11-30
16:31:32 UTC (rev 11796)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-11-30
16:32:08 UTC (rev 11797)
@@ -44,12 +44,11 @@
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.replication.impl.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnection;
@@ -73,17 +72,13 @@
* A JMSServerControlTest
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
+ *
* Created 14 nov. 2008 13:35:10
*
*
*/
public class JMSServerControlTest extends ManagementTestBase
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(JMSServerControlTest.class);
-
// Attributes ----------------------------------------------------
protected InVMContext context;
@@ -519,10 +514,10 @@
1, // reconnectAttempts
true, // failoverOnInitialConnection
"tst"); // groupID
-
-
+
+
HornetQQueueConnectionFactory cf =
(HornetQQueueConnectionFactory)context.lookup("tst");
-
+
assertEquals(true, cf.isHA());
assertEquals("tst", cf.getClientID());
assertEquals(1, cf.getClientFailureCheckPeriod());
@@ -551,15 +546,15 @@
assertEquals(1, cf.getReconnectAttempts());
assertEquals(true, cf.isFailoverOnInitialConnection());
assertEquals("tst", cf.getGroupID());
-
+
stopServer();
-
+
startServer();
-
+
control = createManagementControl();
-
+
cf = (HornetQQueueConnectionFactory)context.lookup("tst");
-
+
assertEquals(true, cf.isHA());
assertEquals("tst", cf.getClientID());
assertEquals(1, cf.getClientFailureCheckPeriod());
@@ -588,19 +583,19 @@
assertEquals(1, cf.getReconnectAttempts());
assertEquals(true, cf.isFailoverOnInitialConnection());
assertEquals("tst", cf.getGroupID());
-
+
control.destroyConnectionFactory("test");
-
+
ObjectNameBuilder nameBuilder =
ObjectNameBuilder.create(ConfigurationImpl.DEFAULT_JMX_DOMAIN);
assertFalse(mbeanServer.isRegistered(nameBuilder.getConnectionFactoryObjectName("test")));
-
+
stopServer();
-
+
startServer();
-
+
assertFalse(mbeanServer.isRegistered(nameBuilder.getConnectionFactoryObjectName("test")));
-
-
+
+
try
{
cf = (HornetQQueueConnectionFactory)context.lookup("tst");
@@ -609,8 +604,8 @@
catch (NamingException e)
{
}
-
-
+
+
}
public void testListPreparedTransactionDetails() throws Exception
@@ -737,9 +732,9 @@
serverManager.setContext(context);
serverManager.start();
serverManager.activated();
-
+
this.fakeJMSStorageManager = new
FakeJMSStorageManager(serverManager.getJMSStorageManager());
-
+
serverManager.replaceStorageManager(fakeJMSStorageManager);
}
@@ -819,9 +814,9 @@
Map<String, PersistedConnectionFactory> connectionFactoryMap = new
HashMap<String, PersistedConnectionFactory>();
ConcurrentHashMap<String, List<String>> persistedJNDIMap = new
ConcurrentHashMap<String, List<String>>();
-
+
JMSStorageManager delegate;
-
+
public FakeJMSStorageManager(JMSStorageManager delegate)
{
this.delegate = delegate;