[hornetq-commits] JBoss hornetq SVN: r8121 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Oct 16 22:25:11 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-16 22:25:10 -0400 (Fri, 16 Oct 2009)
New Revision: 8121
Modified:
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/replication/ReplicationContext.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-125 - Replication stop on backup failure
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -54,10 +54,6 @@
return 1;
}
- public void flush()
- {
- }
-
public int calculateBlockStart(final int position) throws Exception
{
return position;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -1764,7 +1764,7 @@
private class FinishPageMessageOperation implements TransactionOperation
{
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -1100,7 +1100,7 @@
return Collections.emptySet();
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
@@ -1214,7 +1214,7 @@
this.refs = refs;
}
- public void afterCommit(Transaction tx) throws Exception
+ public void afterCommit(Transaction tx)
{
for (MessageReference ref : refs)
{
Modified: trunk/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -34,5 +34,8 @@
/** To be called when there are no more operations pending */
void complete();
+
+ /** Flush all pending callbacks on the Context */
+ void flush();
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -48,17 +48,17 @@
private final ReplicationManager replicationManager;
- private final Journal replicatedJournal;
+ private final Journal localJournal;
private final byte journalID;
public ReplicatedJournal(final byte journaID,
- final Journal replicatedJournal,
+ final Journal localJournal,
final ReplicationManager replicationManager)
{
super();
journalID = journaID;
- this.replicatedJournal = replicatedJournal;
+ this.localJournal = localJournal;
this.replicationManager = replicationManager;
}
@@ -100,7 +100,7 @@
trace("Append record id = " + id + " recordType = " + recordType);
}
replicationManager.appendAddRecord(journalID, id, recordType, record);
- replicatedJournal.appendAddRecord(id, recordType, record, sync);
+ localJournal.appendAddRecord(id, recordType, record, sync);
}
/**
@@ -134,7 +134,7 @@
trace("Append record TXid = " + id + " recordType = " + recordType);
}
replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
- replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ localJournal.appendAddRecordTransactional(txID, id, recordType, record);
}
/**
@@ -150,7 +150,7 @@
trace("AppendCommit " + txID);
}
replicationManager.appendCommitRecord(journalID, txID);
- replicatedJournal.appendCommitRecord(txID, sync);
+ localJournal.appendCommitRecord(txID, sync);
}
/**
@@ -166,7 +166,7 @@
trace("AppendDelete " + id);
}
replicationManager.appendDeleteRecord(journalID, id);
- replicatedJournal.appendDeleteRecord(id, sync);
+ localJournal.appendDeleteRecord(id, sync);
}
/**
@@ -195,7 +195,7 @@
trace("AppendDelete txID=" + txID + " id=" + id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
- replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+ localJournal.appendDeleteRecordTransactional(txID, id, record);
}
/**
@@ -211,7 +211,7 @@
trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
- replicatedJournal.appendDeleteRecordTransactional(txID, id);
+ localJournal.appendDeleteRecordTransactional(txID, id);
}
/**
@@ -240,7 +240,7 @@
trace("AppendPrepare txID=" + txID);
}
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
- replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+ localJournal.appendPrepareRecord(txID, transactionData, sync);
}
/**
@@ -256,7 +256,7 @@
trace("AppendRollback " + txID);
}
replicationManager.appendRollbackRecord(journalID, txID);
- replicatedJournal.appendRollbackRecord(txID, sync);
+ localJournal.appendRollbackRecord(txID, sync);
}
/**
@@ -287,7 +287,7 @@
trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
}
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
- replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+ localJournal.appendUpdateRecord(id, recordType, record, sync);
}
/**
@@ -324,7 +324,7 @@
trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
}
replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
- replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
}
/**
@@ -339,7 +339,7 @@
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure) throws Exception
{
- return replicatedJournal.load(committedRecords, preparedTransactions, transactionFailure);
+ return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
}
/**
@@ -350,7 +350,7 @@
*/
public long load(final LoaderCallback reloadManager) throws Exception
{
- return replicatedJournal.load(reloadManager);
+ return localJournal.load(reloadManager);
}
/**
@@ -360,7 +360,7 @@
*/
public void perfBlast(final int pages) throws Exception
{
- replicatedJournal.perfBlast(pages);
+ localJournal.perfBlast(pages);
}
/**
@@ -369,7 +369,7 @@
*/
public void start() throws Exception
{
- replicatedJournal.start();
+ localJournal.start();
}
/**
@@ -378,7 +378,7 @@
*/
public void stop() throws Exception
{
- replicatedJournal.stop();
+ localJournal.stop();
}
/* (non-Javadoc)
@@ -386,7 +386,7 @@
*/
public int getAlignment() throws Exception
{
- return replicatedJournal.getAlignment();
+ return localJournal.getAlignment();
}
/* (non-Javadoc)
@@ -394,7 +394,7 @@
*/
public boolean isStarted()
{
- return replicatedJournal.isStarted();
+ return localJournal.isStarted();
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -53,14 +53,22 @@
{
if (--pendings == 0)
{
- if (tasks != null)
+ flush();
+ }
+ }
+
+ /**
+ *
+ */
+ public void flush()
+ {
+ if (tasks != null)
+ {
+ for (Runnable run : tasks)
{
- for (Runnable run : tasks)
- {
- executor.execute(run);
- }
- tasks.clear();
+ executor.execute(run);
}
+ tasks.clear();
}
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -19,11 +19,13 @@
import java.util.concurrent.Executor;
import org.hornetq.core.client.impl.FailoverManager;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
@@ -40,8 +42,8 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationContext;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.core.replication.ReplicationContext;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.SimpleString;
@@ -79,11 +81,11 @@
private final Executor executor;
- private final ThreadLocal<ReplicationContext> repliToken = new ThreadLocal<ReplicationContext>();
+ private final ThreadLocal<ReplicationContext> tlReplicationContext = new ThreadLocal<ReplicationContext>();
private final Queue<ReplicationContext> pendingTokens = new ConcurrentLinkedQueue<ReplicationContext>();
- private final ConcurrentHashSet<ReplicationContext> activeTokens = new ConcurrentHashSet<ReplicationContext>();
+ private final ConcurrentHashSet<ReplicationContext> activeContexts = new ConcurrentHashSet<ReplicationContext>();
// Static --------------------------------------------------------
@@ -255,7 +257,7 @@
sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
}
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageBegin(byte[])
*/
@@ -300,8 +302,6 @@
}
}
-
-
/* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
@@ -330,6 +330,22 @@
mainChannel.sendBlocking(replicationStartPackage);
+ failoverManager.addFailureListener(new FailureListener()
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ log.warn("Connection to the backup node failed, removing replication now");
+ try
+ {
+ stop();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ });
+
started = true;
enabled = true;
@@ -340,6 +356,16 @@
*/
public void stop() throws Exception
{
+ enabled = false;
+
+ for (ReplicationContext ctx : activeContexts)
+ {
+ ctx.complete();
+ ctx.flush();
+ }
+
+ activeContexts.clear();
+
if (replicatingChannel != null)
{
replicatingChannel.close();
@@ -353,16 +379,18 @@
}
connection = null;
+
+ started = false;
}
public ReplicationContext getContext()
{
- ReplicationContext token = repliToken.get();
+ ReplicationContext token = tlReplicationContext.get();
if (token == null)
{
token = new ReplicationContextImpl(executor);
- activeTokens.add(token);
- repliToken.set(token);
+ activeContexts.add(token);
+ tlReplicationContext.set(token);
}
return token;
}
@@ -380,17 +408,17 @@
*/
public void closeContext()
{
- final ReplicationContext token = repliToken.get();
+ final ReplicationContext token = tlReplicationContext.get();
if (token != null)
{
// Disassociate thread local
- repliToken.set(null);
+ tlReplicationContext.set(null);
// Remove from pending tokens as soon as this is complete
token.addReplicationAction(new Runnable()
{
public void run()
{
- activeTokens.remove(token);
+ activeContexts.remove(token);
}
});
}
@@ -401,7 +429,7 @@
*/
public Set<ReplicationContext> getActiveTokens()
{
- return activeTokens;
+ return activeContexts;
}
private void sendReplicatePacket(final Packet packet)
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -192,9 +192,9 @@
private boolean initialised;
private FailoverManager replicationFailoverManager;
-
+
private ReplicationManager replicationManager;
-
+
private ReplicationEndpoint replicationEndpoint;
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
@@ -251,7 +251,7 @@
addressSettingsRepository.setDefault(new AddressSettings());
- // this.managementConnectorID = managementConnectorSequence.decrementAndGet();
+ // this.managementConnectorID = managementConnectorSequence.decrementAndGet();
}
// lifecycle methods
@@ -351,7 +351,7 @@
{
storageManager.stop();
}
-
+
if (replicationEndpoint != null)
{
replicationEndpoint.stop();
@@ -407,7 +407,7 @@
{
memoryManager.stop();
}
-
+
pagingManager = null;
securityStore = null;
resourceManager = null;
@@ -603,22 +603,21 @@
return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
-
+
public synchronized ReplicationEndpoint createReplicationEndpoint(final Channel channel) throws Exception
{
if (!configuration.isBackup())
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server");
}
-
+
if (replicationEndpoint == null)
{
replicationEndpoint = new ReplicationEndpointImpl(this);
replicationEndpoint.setChannel(channel);
replicationEndpoint.start();
}
-
-
+
return replicationEndpoint;
}
@@ -660,81 +659,6 @@
}
}
- // public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
- // {
- // if (theUUID == null)
- // {
- // throw new IllegalArgumentException("node id is null");
- // }
- //
- // synchronized (initialiseLock)
- // {
- // if (initialised)
- // {
- // throw new IllegalStateException("Server is already initialised");
- // }
- //
- // this.uuid = theUUID;
- //
- // this.nodeID = new SimpleString(uuid.toString());
- //
- // initialisePart2();
- //
- // long backupID = storageManager.getCurrentUniqueID();
- //
- // if (liveUniqueID != backupID)
- // {
- // initialised = false;
- //
- // throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
- // ":" +
- // backupID +
- // "). You're probably trying to restart a live backup pair after a crash");
- // }
- //
- // log.info("Backup server is now operational");
- // }
- // }
-
- private boolean startReplication() throws Exception
- {
- String backupConnectorName = configuration.getBackupConnectorName();
-
- if (backupConnectorName != null)
- {
- TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
-
- if (backupConnector == null)
- {
- log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
- }
- else
- {
-
- replicationFailoverManager = new FailoverManagerImpl((ClientSessionFactory)null,
- backupConnector,
- null,
- false,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- 0,
- 1.0d,
- 0,
- 1,
- threadPool,
- scheduledPool,
- null);
-
-
- this.replicationManager = new ReplicationManagerImpl(replicationFailoverManager, this.executorFactory.getExecutor());
- replicationManager.start();
- }
- }
-
- return true;
- }
-
public HornetQServerControlImpl getHornetQServerControl()
{
return messagingServerControl;
@@ -841,6 +765,30 @@
// Protected
// ------------------------------------------------------------------------------------
+ /**
+ * Protected so tests can change this behaviour
+ * @param backupConnector
+ */
+ protected FailoverManagerImpl createBackupConnection(final TransportConfiguration backupConnector,
+ final ExecutorService threadPool,
+ final ScheduledExecutorService scheduledPool)
+ {
+ return new FailoverManagerImpl((ClientSessionFactory)null,
+ backupConnector,
+ null,
+ false,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ 0,
+ 1.0d,
+ 0,
+ 1,
+ threadPool,
+ scheduledPool,
+ null);
+ }
+
protected PagingManager createPagingManager()
{
return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
@@ -873,7 +821,34 @@
// Private
// --------------------------------------------------------------------------------------
+
+ private boolean startReplication() throws Exception
+ {
+ String backupConnectorName = configuration.getBackupConnectorName();
+ if (backupConnectorName != null)
+ {
+ TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+
+ if (backupConnector == null)
+ {
+ log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
+ }
+ else
+ {
+
+ replicationFailoverManager = createBackupConnection(backupConnector, threadPool, scheduledPool);
+
+ this.replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
+ this.executorFactory.getExecutor());
+ replicationManager.start();
+ }
+ }
+
+ return true;
+ }
+
+
private synchronized void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -895,10 +870,10 @@
log.warn("There is no replication endpoint, can't activate this backup server");
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
}
-
+
replicationEndpoint.stop();
}
-
+
// Complete the startup procedure
log.info("Activating server");
@@ -933,11 +908,7 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration);
- remotingService = new RemotingServiceImpl(configuration,
- this,
- managementService,
- threadPool,
- scheduledPool);
+ remotingService = new RemotingServiceImpl(configuration, this, managementService, threadPool, scheduledPool);
if (configuration.getMemoryMeasureInterval() != -1)
{
@@ -964,7 +935,6 @@
deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod());
}
-
startReplication();
this.storageManager = createStorageManager();
@@ -1009,7 +979,7 @@
this,
queueFactory,
scheduledPool,
- pagingManager,
+ pagingManager,
configuration.isBackup());
// Address settings need to deployed initially, since they're require on paging manager.start()
Property changes on: trunk/tests/src/org/hornetq/tests
___________________________________________________________________
Name: svn:ignore
+ svnignored
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-17 02:25:10 UTC (rev 8121)
@@ -15,6 +15,7 @@
import static org.hornetq.tests.util.RandomUtil.randomString;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -24,6 +25,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientSessionFactory;
@@ -46,6 +48,9 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.impl.ReplicatedJournal;
@@ -79,8 +84,6 @@
private ExecutorService executor;
- private FailoverManager connectionManager;
-
private ScheduledExecutorService scheduledExecutor;
// Static --------------------------------------------------------
@@ -98,11 +101,13 @@
HornetQServer server = new HornetQServerImpl(config);
+ FailoverManager failoverManager = createFailoverManager();
+
server.start();
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
manager.start();
manager.stop();
}
@@ -123,9 +128,12 @@
server.start();
+ FailoverManager failoverManager = createFailoverManager();
+
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+
try
{
manager.start();
@@ -154,9 +162,11 @@
server.start();
+ FailoverManager failoverManager = createFailoverManager();
+
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -212,12 +222,12 @@
server.getConfiguration(),
server.getExecutorFactory(),
server.getAddressSettingsRepository());
-
+
PagingStore store = pagingManager.getPageStore(dummy);
store.start();
assertEquals(5, store.getNumberOfPages());
store.stop();
-
+
manager.pageDeleted(dummy, 1);
manager.pageDeleted(dummy, 2);
manager.pageDeleted(dummy, 3);
@@ -226,25 +236,24 @@
manager.pageDeleted(dummy, 6);
blockOnReplication(manager);
-
+
ServerMessageImpl serverMsg = new ServerMessageImpl();
serverMsg.setMessageID(500);
serverMsg.setDestination(new SimpleString("tttt"));
-
-
+
HornetQBuffer buffer = ChannelBuffers.dynamicBuffer(100);
serverMsg.encodeProperties(buffer);
-
+
manager.largeMessageBegin(500);
manager.largeMessageWrite(500, new byte[1024]);
-
+
manager.largeMessageEnd(500);
-
+
blockOnReplication(manager);
-
+
store.start();
-
+
assertEquals(0, store.getNumberOfPages());
manager.stop();
@@ -255,26 +264,49 @@
}
}
-
public void testSendPacketsWithFailure() throws Exception
{
Configuration config = createDefaultConfig(false);
config.setBackup(true);
+
+ final AtomicBoolean returnIntercept = new AtomicBoolean(true);
+ final Interceptor intercept = new Interceptor()
+ {
+
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (returnIntercept.get())
+ {
+ System.out.println("Returning true");
+ }
+ return returnIntercept.get();
+ }
+
+ };
+
HornetQServer server = new HornetQServerImpl(config);
server.start();
+ final ArrayList<Interceptor> listInterceptor = new ArrayList<Interceptor>();
+ listInterceptor.add(intercept);
+
+ FailoverManager failoverManager = createFailoverManager(listInterceptor);
+
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
- for (int i = 0 ; i < 500; i++)
+ Thread.sleep(100);
+ returnIntercept.set(false);
+
+ for (int i = 0; i < 500; i++)
{
replicatedJournal.appendAddRecord(i, (byte)1, new FakeData(), false);
}
@@ -287,10 +319,12 @@
latch.countDown();
}
});
-
+
manager.closeContext();
- assertTrue(latch.await(10, TimeUnit.SECONDS));
+ server.stop();
+
+ assertTrue(latch.await(50, TimeUnit.SECONDS));
}
finally
{
@@ -314,7 +348,7 @@
}
});
-
+
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@@ -329,9 +363,11 @@
server.start();
+ FailoverManager failoverManager = createFailoverManager();
+
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -395,25 +431,7 @@
}
// Package protected ---------------------------------------------
- /*class LocalRemotingServiceImpl extends RemotingServiceImpl
- {
-
- public LocalRemotingServiceImpl(final Configuration config,
- final HornetQServer server,
- final ManagementService managementService,
- final Executor threadPool,
- final ScheduledExecutorService scheduledThreadPool)
- {
- super(config, server, managementService, threadPool, scheduledThreadPool);
- }
- protected ChannelHandler createHandler(RemotingConnection conn, Channel channel)
- {
- return super.createHandler(conn, channel);
- }
-
- }*/
-
// Protected -----------------------------------------------------
protected void setUp() throws Exception
@@ -426,25 +444,33 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
+ }
+
+ private FailoverManagerImpl createFailoverManager()
+ {
+ return createFailoverManager(null);
+ }
+
+ private FailoverManagerImpl createFailoverManager(List<Interceptor> interceptors)
+ {
TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
new HashMap<String, Object>(),
randomString());
- connectionManager = new FailoverManagerImpl((ClientSessionFactory)null,
- connectorConfig,
- null,
- false,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- 0,
- 1.0d,
- 0,
- 1,
- executor,
- scheduledExecutor,
- null);
-
+ return new FailoverManagerImpl((ClientSessionFactory)null,
+ connectorConfig,
+ null,
+ false,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ 0,
+ 1.0d,
+ 0,
+ 1,
+ executor,
+ scheduledExecutor,
+ interceptors);
}
protected void tearDown() throws Exception
@@ -456,8 +482,6 @@
tFactory = null;
- connectionManager = null;
-
scheduledExecutor = null;
super.tearDown();
More information about the hornetq-commits
mailing list