Author: borges
Date: 2011-07-01 09:31:52 -0400 (Fri, 01 Jul 2011)
New Revision: 10910
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Still with non-functional replication
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -53,7 +53,6 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.JournalReaderCallback;
@@ -98,13 +97,12 @@
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUID;
import org.hornetq.utils.XidCodecSupport;
/**
- *
+ *
* A JournalStorageManager
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
@@ -155,15 +153,13 @@
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
- private UUID persistentID;
-
private final BatchingIDGenerator idGenerator;
- private final ReplicationManager replicator;
+ private ReplicationManager replicator;
- private final Journal messageJournal;
+ private Journal messageJournal;
- private final Journal bindingsJournal;
+ private Journal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
@@ -330,6 +326,17 @@
return replicator != null;
}
+ public void setReplicator(ReplicationManager replicationManager)
+ {
+ assert replicationManager != null;
+ replicator = replicationManager;
+ Journal localMessageJournal = messageJournal;
+ Journal localBindingsJournal = bindingsJournal;
+ bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+ // XXX HORNETQ-720 obviously missing here is the synchronization step.
+ }
+
public void waitOnOperations() throws Exception
{
if (!started)
@@ -340,9 +347,6 @@
waitOnOperations(0);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
- */
public boolean waitOnOperations(final long timeout) throws Exception
{
if (!started)
@@ -756,11 +760,11 @@
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
-
+
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.UPDATE_DELIVERY_COUNT,
updateInfo,
-
+
syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -1137,19 +1141,19 @@
continue;
}
-
+
// Redistribution could install a Redistributor while we are still loading
records, what will be an issue with prepared ACKs
// We make sure te Queue is paused before we reroute values.
queue.pause();
Collection<AddMessageRecord> valueRecords = queueRecords.values();
-
+
long currentTime = System.currentTimeMillis();
for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
-
+
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <=
currentTime)
{
scheduledDeliveryTime = 0;
@@ -1224,7 +1228,7 @@
{
messageJournal.perfBlast(perfBlastPages);
}
-
+
for (Queue queue : queues.values())
{
queue.resume();
@@ -1442,8 +1446,8 @@
return bindingsInfo;
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#lineUpContext()
*/
@@ -2990,6 +2994,7 @@
this.refEncoding = refEncoding;
}
+ @Override
public String toString()
{
return "AddRef;" + refEncoding;
@@ -3006,6 +3011,7 @@
this.refEncoding = refEncoding;
}
+ @Override
public String toString()
{
return "ACK;" + refEncoding;
@@ -3022,6 +3028,7 @@
Message msg;
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -29,6 +29,7 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
@@ -75,7 +76,7 @@
: null,
server.getNodeID());
- Channel channel1 = rc.getChannel(1, -1);
+ Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
ChannelHandler handler = new HornetQPacketHandler(this, server, channel1, rc);
@@ -90,7 +91,7 @@
final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(),
ttl);
- final Channel channel0 = rc.getChannel(0, -1);
+ final Channel channel0 = rc.getChannel(CHANNEL_ID.PING.id, -1);
channel0.setHandler(new ChannelHandler()
{
@@ -157,10 +158,14 @@
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
System.out.println("HA_BACKUP_REGISTRATION: " + msg + "
connector=" + msg.getConnector());
+ long channelID = msg.getChannelID();
+ Channel channelX = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
+ Channel replicationChannel = rc.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
+ System.out.println("msg channelID: " + channelID);
System.out.println("HA_BR: " + server.getIdentity() + ",
toString=" + server);
try
{
- server.addHaBackup(msg.getConnector());
+ server.addHaBackup(channelX, replicationChannel);
}
catch (Exception e)
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -25,10 +25,12 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -113,6 +115,25 @@
break;
}
+ case PacketImpl.HA_BACKUP_REGISTRATION:
+ {
+ HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+ System.out.println("HA_BACKUP_REGISTRATION: " + msg + "
connector=" + msg.getConnector());
+ long channelID = msg.getChannelID();
+ Channel channelX = connection.getChannel(CHANNEL_ID.SESSION.id, -1);
+ Channel replicationChannel = connection.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
+ System.out.println("HA_BR: " + server.getIdentity() + ",
toString=" + server);
+ try
+ {
+ server.addHaBackup(channelX, replicationChannel);
+ }
+ catch (Exception e)
+ {
+ // XXX This is not what we want
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
default:
{
HornetQPacketHandler.log.error("Invalid packet " + packet);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -21,12 +21,13 @@
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.server.HornetQComponent;
/**
+ * Used by the {@link JournalStorageManager} to update the replicated journal.
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public interface ReplicationManager extends HornetQComponent
{
@@ -79,7 +80,7 @@
/**
* @param journalInfo
- * @throws HornetQException
+ * @throws HornetQException
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -30,7 +30,7 @@
import org.hornetq.core.replication.ReplicationManager;
/**
- * Used by the {@link JournalStorageManager} to replicate journal calls.
+ * Used by the {@link JournalStorageManager} to replicate journal calls.
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -59,10 +59,11 @@
private final byte journalID;
- public ReplicatedJournal(final byte journaID, final Journal localJournal, final
ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journalID, final Journal localJournal,
+ final ReplicationManager replicationManager)
{
super();
- journalID = journaID;
+ this.journalID = journalID;
this.localJournal = localJournal;
this.replicationManager = replicationManager;
}
@@ -201,10 +202,10 @@
}
replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
-
+
}
-
+
/**
* @param id
* @param sync
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -33,6 +33,7 @@
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
@@ -52,7 +53,7 @@
/**
* A ReplicationManagerImpl
- *
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public class ReplicationManagerImpl implements ReplicationManager
@@ -65,12 +66,11 @@
private final ResponseHandler responseHandler = new ResponseHandler();
- private final ClientSessionFactoryInternal sessionFactory;
+// private final ClientSessionFactoryInternal sessionFactory;
+// private CoreRemotingConnection replicatingConnection;
- private CoreRemotingConnection replicatingConnection;
+ private final Channel replicatingChannel;
- private Channel replicatingChannel;
-
private boolean started;
private volatile boolean enabled;
@@ -79,10 +79,12 @@
private final Queue<OperationContext> pendingTokens = new
ConcurrentLinkedQueue<OperationContext>();
- private final ExecutorFactory executorFactory;
+ private ExecutorFactory executorFactory;
private SessionFailureListener failureListener;
+ private final Channel systemChannel;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -90,15 +92,25 @@
public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory, final
ExecutorFactory executorFactory)
{
super();
- this.sessionFactory = sessionFactory;
this.executorFactory = executorFactory;
+
+ CoreRemotingConnection conn = sessionFactory.getConnection();
+ systemChannel = conn.getChannel(CHANNEL_ID.SESSION.id, -1);
+ replicatingChannel = conn.getChannel(CHANNEL_ID.REPLICATION.id, -1);
}
// Public --------------------------------------------------------
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[],
org.hornetq.core.replication.ReplicationToken)
+ /**
+ * @param systemChannel
+ * @param replicatingChannel
*/
+ public ReplicationManagerImpl(Channel systemChannel, Channel replicatingChannel)
+ {
+ super();
+ this.systemChannel = systemChannel;
+ this.replicatingChannel = replicatingChannel;
+ }
public void appendAddRecord(final byte journalID, final long id, final byte
recordType, final EncodingSupport record)
{
@@ -302,26 +314,21 @@
throw new IllegalStateException("ReplicationManager is already
started");
}
- replicatingConnection = sessionFactory.getConnection();
+// replicatingConnection = sessionFactory.getConnection();
+//
+// if (replicatingConnection == null)
+// {
+// ReplicationManagerImpl.log.warn("Backup server MUST be started before
live server. Initialisation will not proceed.");
+// throw new HornetQException(HornetQException.ILLEGAL_STATE,
+// "Backup server MUST be started before live
server. Initialisation will not proceed.");
+// }
- if (replicatingConnection == null)
- {
- ReplicationManagerImpl.log.warn("Backup server MUST be started before live
server. Initialisation will not proceed.");
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Backup server MUST be started before live
server. Initialisation will not proceed.");
- }
-
- long channelID = replicatingConnection.generateChannelID();
-
- Channel mainChannel = replicatingConnection.getChannel(1, -1);
-
- replicatingChannel = replicatingConnection.getChannel(channelID, -1);
-
replicatingChannel.setHandler(responseHandler);
- CreateReplicationSessionMessage replicationStartPackage = new
CreateReplicationSessionMessage(channelID);
+ CreateReplicationSessionMessage replicationStartPackage =
+ new CreateReplicationSessionMessage(replicatingChannel.getID());
- mainChannel.sendBlocking(replicationStartPackage);
+ systemChannel.sendBlocking(replicationStartPackage);
failureListener = new SessionFailureListener()
{
@@ -351,7 +358,7 @@
{
}
};
- sessionFactory.addFailureListener(failureListener);
+ // sessionFactory.addFailureListener(failureListener);
started = true;
@@ -390,15 +397,15 @@
replicatingChannel.close();
}
- sessionFactory.causeExit();
- sessionFactory.removeFailureListener(failureListener);
- if (replicatingConnection != null)
- {
- replicatingConnection.destroy();
- }
+// sessionFactory.causeExit();
+// sessionFactory.removeFailureListener(failureListener);
+// if (replicatingConnection != null)
+// {
+// replicatingConnection.destroy();
+// }
+//
+// replicatingConnection = null;
- replicatingConnection = null;
-
started = false;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -20,7 +20,6 @@
import javax.management.MBeanServer;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
@@ -177,9 +176,5 @@
void stop(boolean failoverOnServerShutdown) throws Exception;
- /**
- * @param connector
- * @throws Exception
- */
- void addHaBackup(TransportConfiguration connector) throws Exception;
+ void addHaBackup(Channel channelX, Channel replicationChannel) throws Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -44,7 +44,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
@@ -82,6 +81,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
@@ -123,7 +123,6 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -214,8 +213,6 @@
private final Map<String, ServerSession> sessions = new
ConcurrentHashMap<String, ServerSession>();
- private final Set<String> sharedNothingBackups = new
ConcurrentHashSet<String>();
-
private final Object initialiseLock = new Object();
private boolean initialised;
@@ -517,7 +514,7 @@
}
}
- private class SharedNothingBackupActivation implements Activation
+ private final class SharedNothingBackupActivation implements Activation
{
public void run()
{
@@ -550,11 +547,10 @@
}
log.info("announce backup to live-server (id=" + liveConnectorName
+ ")");
liveServerSessionFactory.getConnection()
- .getChannel(0, -1)
+ .getChannel(CHANNEL_ID.SESSION.id, -1)
.send(new
HaBackupRegistrationMessage(getNodeID().toString(), config));
log.info("backup registered");
-
started = true;
log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
@@ -639,7 +635,8 @@
}
else
{
- // Replicated
+ assert replicationEndpoint == null;
+ replicationEndpoint = new ReplicationEndpointImpl(this);
activation = new SharedNothingBackupActivation();
}
@@ -993,14 +990,12 @@
public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel
channel) throws Exception
{
- if (configuration.isBackup())
+ if (!configuration.isBackup())
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected
server is a backup server");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected
server is not a backup server " +
+ getIdentity());
}
- assert replicationEndpoint == null;
- replicationEndpoint = new ReplicationEndpointImpl(this);
-
if (replicationEndpoint == null)
System.err.println("endpoint is null!");
@@ -1262,23 +1257,23 @@
// Private
//
--------------------------------------------------------------------------------------
- private boolean startReplication(TransportConfiguration connector) throws Exception
- {
- assert !configuration.isSharedStore();
- if (configuration.isSharedStore() || connector == null)
- {
- return true;
- }
+// private boolean startReplication(TransportConfiguration connector) throws Exception
+// {
+// assert !configuration.isSharedStore();
+// if (configuration.isSharedStore() || connector == null)
+// {
+// return true;
+// }
+//
+// serverLocator = HornetQClient.createServerLocatorWithHA(connector);
+// ClientSessionFactoryInternal replicationFailoverManager =
+//
(ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
+// replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
executorFactory);
+// replicationManager.start();
+//
+// return true;
+// }
- serverLocator = HornetQClient.createServerLocatorWithHA(connector);
- ClientSessionFactoryInternal replicationFailoverManager =
-
(ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
- replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
executorFactory);
- replicationManager.start();
-
- return true;
- }
-
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1950,11 +1945,23 @@
}
@Override
- public void addHaBackup(TransportConfiguration connector) throws Exception
+ public void addHaBackup(Channel systemChannel, Channel replicatingChannel) throws
Exception
{
- log.info(connector + " " + connector.getFactoryClassName() + "
" + connector.getParams() + " " +
- replicationManager);
- startReplication(connector);
- // throw new UnsupportedOperationException("unimplemented");
+ if (!(storageManager instanceof JournalStorageManager))
+ return;
+ JournalStorageManager journalStorageManager =
(JournalStorageManager)storageManager;
+
+ System.out.println(HornetQServerImpl.class.getName() + " " +
this.getIdentity() +
+ ": create a ReplicationManagerImpl. Using ChannelID=" +
systemChannel);
+ // XXX not sure this is the right call to use
+// final ServerLocatorInternal serverLocator =
+//
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(connector);
+// ClientSessionFactoryInternal sessionFactory =
(ClientSessionFactoryInternal)serverLocator.createSessionFactory();
+ replicationManager = new ReplicationManagerImpl(systemChannel,
replicatingChannel);
+ System.out.println("rep.start()");
+ replicationManager.start();
+
+ System.out.println("add RepMan to JournalStorageManager...");
+ journalStorageManager.setReplicator(replicationManager);
}
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-01
13:30:25 UTC (rev 10909)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-01
13:31:52 UTC (rev 10910)
@@ -169,8 +169,7 @@
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_backup");
config1.getAcceptorConfigurations().clear();
config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- TransportConfiguration tc = getConnectorTransportConfiguration(true);
- config1.getConnectorConfigurations().put(LIVE_NODE_NAME, tc);
+ config1.getConnectorConfigurations().put(LIVE_NODE_NAME,
getConnectorTransportConfiguration(true));
//liveConfig.setBackupConnectorName("toBackup");
config1.setSecurityEnabled(false);