[hornetq-commits] JBoss hornetq SVN: r11765 - in trunk/hornetq-core/src/main/java/org/hornetq/core: persistence/impl/journal and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Nov 25 10:00:31 EST 2011
Author: borges
Date: 2011-11-25 10:00:31 -0500 (Fri, 25 Nov 2011)
New Revision: 11765
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-768 Reset state at JournalStorageManager in case of errors.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-11-25 15:00:31 UTC (rev 11765)
@@ -269,4 +269,11 @@
ServerMessage message,
RoutingContext ctx,
RouteContextList listCtx) throws Exception;
+
+ /**
+ * Stops the replication of data from the live to the backup.
+ * <p>
+ * Typical scenario is a broken connection.
+ */
+ void stopReplication();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-25 15:00:31 UTC (rev 11765)
@@ -194,9 +194,11 @@
}
}
- private Journal messageJournal;
- private Journal bindingsJournal;
- private final SequentialFileFactory largeMessagesFactory;
+ private Journal messageJournal;
+ private Journal bindingsJournal;
+ private final Journal originalMessageJournal;
+ private final Journal originalBindingsJournal;
+ private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
@@ -235,18 +237,10 @@
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory,
final IOCriticalErrorListener criticalErrorListener)
{
- this(config, executorFactory, null, criticalErrorListener);
- }
-
- public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory,
- final ReplicationManager replicator, final IOCriticalErrorListener criticalErrorListener)
- {
this.executorFactory = executorFactory;
executor = executorFactory.getExecutor();
- this.replicator = replicator;
-
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
{
throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
@@ -274,19 +268,13 @@
"bindings",
1);
- if (replicator != null)
- {
- bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
- }
- else
- {
- bindingsJournal = localBindings;
- }
+ bindingsJournal = localBindings;
+ originalBindingsJournal = localBindings;
- if (journalDir == null)
- {
- throw new NullPointerException("journal-dir is null");
- }
+ if (journalDir == null)
+ {
+ throw new NullPointerException("journal-dir is null");
+ }
createJournalDir = config.isCreateJournalDir();
@@ -332,14 +320,8 @@
config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
: config.getJournalMaxIO_NIO());
- if (replicator != null)
- {
- messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
- }
- else
- {
- messageJournal = localMessage;
- }
+ messageJournal = localMessage;
+ originalMessageJournal = localMessage;
largeMessagesDirectory = config.getLargeMessagesDirectory();
largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
@@ -380,8 +362,8 @@
JournalFile[] messageFiles = null;
JournalFile[] bindingsFiles = null;
- final Journal localMessageJournal = messageJournal;
- final Journal localBindingsJournal = bindingsJournal;
+ try
+ {
Map<String, Long> largeMessageFilesToSync;
Map<SimpleString, Collection<Integer>> pageFilesToSync;
@@ -389,15 +371,15 @@
try
{
replicator = replicationManager;
- localMessageJournal.synchronizationLock();
- localBindingsJournal.synchronizationLock();
+ originalMessageJournal.synchronizationLock();
+ originalBindingsJournal.synchronizationLock();
try
{
pagingManager.lock();
try
{
- messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES, nodeID);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS, nodeID);
+ messageFiles = prepareJournalForCopy(originalMessageJournal, JournalContent.MESSAGES, nodeID);
+ bindingsFiles = prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID);
pageFilesToSync = getPageInformationForSync(pagingManager);
largeMessageFilesToSync = getLargeMessageInformation();
}
@@ -408,11 +390,11 @@
}
finally
{
- localMessageJournal.synchronizationUnlock();
- localBindingsJournal.synchronizationUnlock();
+ originalMessageJournal.synchronizationUnlock();
+ originalBindingsJournal.synchronizationUnlock();
}
- bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
- messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+ bindingsJournal = new ReplicatedJournal(((byte)0), originalBindingsJournal, replicator);
+ messageJournal = new ReplicatedJournal((byte)1, originalMessageJournal, replicator);
}
finally
{
@@ -433,11 +415,36 @@
}
finally
{
+ storageManagerLock.writeLock().unlock();
+ }
+ }
+ catch (Exception e)
+ {
+ stopReplication();
+ throw e;
+ }
+ }
+
+ /**
+ * Stops replication by resetting replication-related fields to their 'unreplicated' state.
+ */
+ @Override
+ public void stopReplication()
+ {
+
+ storageManagerLock.writeLock().lock();
+ try
+ {
+ bindingsJournal = originalBindingsJournal;
+ messageJournal = originalMessageJournal;
+ replicator = null;
+ }
+ finally
+ {
storageManagerLock.writeLock().unlock();
}
}
-
/**
* @param pageFilesToSync
* @throws Exception
@@ -530,10 +537,10 @@
private JournalFile[]
prepareJournalForCopy(Journal journal, JournalContent contentType, String nodeID) throws Exception
{
- journal.forceMoveNextFile();
- JournalFile[] datafiles = journal.getDataFiles();
+ journal.forceMoveNextFile();
+ JournalFile[] datafiles = journal.getDataFiles();
replicator.sendStartSyncMessage(datafiles, contentType, nodeID);
- return datafiles;
+ return datafiles;
}
public void waitOnOperations() throws Exception
@@ -4105,4 +4112,5 @@
}
+
}
\ No newline at end of file
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-11-25 15:00:31 UTC (rev 11765)
@@ -618,4 +618,10 @@
{
return false;
}
+
+ @Override
+ public void stopReplication()
+ {
+ // no-op
+ }
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-25 15:00:31 UTC (rev 11765)
@@ -88,6 +88,7 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -227,6 +228,7 @@
private final Object initialiseLock = new Object();
private boolean initialised;
private final Object startUpLock = new Object();
+ private final Object replicationLock = new Object();
/**
* Only applicable to 'remote backup servers'. If this flag is false the backup may not become
@@ -1201,7 +1203,7 @@
{
if (configuration.isPersistenceEnabled())
{
- return new JournalStorageManager(configuration, executorFactory, replicationManager, shutdownOnCriticalIO);
+ return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
}
else
{
@@ -2265,30 +2267,60 @@
throw new HornetQException(HornetQException.ALREADY_REPLICATING);
}
- replicationManager = new ReplicationManagerImpl(rc, executorFactory);
- try
+ if (!isStarted())
{
- replicationManager.start();
- storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(), clusterConnection,
- pair);
+ throw new IllegalStateException();
}
- catch (Exception e)
+
+ synchronized (replicationLock)
{
- /*
- * The reasoning here is that the exception was either caused by (1) the (interaction with)
- * the backup, or (2) by an IO Error at the storage. If (1), we can swallow the exception
- * and ignore the replication request. If (2) the live will crash shortly.
- */
- log.warn("Exception when trying to start replication", e);
- replicationManager = null;
- if (e instanceof HornetQException)
+
+ if (replicationManager != null)
{
- throw (HornetQException)e;
+ throw new HornetQException(HornetQException.ALREADY_REPLICATING);
}
- else
+
+ rc.addFailureListener(new ReplicationFailureListener());
+ replicationManager = new ReplicationManagerImpl(rc, executorFactory);
+
+ try
{
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
+ replicationManager.start();
+ storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(),
+ clusterConnection, pair);
}
+ catch (Exception e)
+ {
+ /*
+ * The reasoning here is that the exception was either caused by (1) the (interaction
+ * with) the backup, or (2) by an IO Error at the storage. If (1), we can swallow the
+ * exception and ignore the replication request. If (2) the live will crash shortly.
+ */
+ log.warn("Exception when trying to start replication", e);
+
+ try
+ {
+ if (replicationManager != null)
+ replicationManager.stop();
+ }
+ catch (Exception hqe)
+ {
+ log.warn("Exception while trying to close replicationManager", hqe);
+ }
+ finally
+ {
+ replicationManager = null;
+ }
+
+ if (e instanceof HornetQException)
+ {
+ throw (HornetQException)e;
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
+ }
+ }
}
}
@@ -2309,4 +2341,28 @@
nodeManager.setNodeID(nodeID);
backupUpToDate = true;
}
+
+ private final class ReplicationFailureListener implements FailureListener
+ {
+
+ @Override
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ Executors.newSingleThreadExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ synchronized (replicationLock)
+
+ {
+ if (replicationManager != null)
+ {
+ storageManager.stopReplication();
+ }
+ }
+ }
+ });
+ }
+ }
+
}
More information about the hornetq-commits
mailing list