Author: borges
Date: 2011-07-04 06:46:18 -0400 (Mon, 04 Jul 2011)
New Revision: 10914
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Pass the executor to the replication manager.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-01
18:19:25 UTC (rev 10913)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-04
10:46:18 UTC (rev 10914)
@@ -317,41 +317,41 @@
private synchronized void initialise() throws Exception
{
- if (!readOnly)
+ if (readOnly)
{
- setThreadPools();
+ return;
+ }
- instantiateLoadBalancingPolicy();
+ setThreadPools();
+ instantiateLoadBalancingPolicy();
- if (discoveryGroupConfiguration != null)
+ if (discoveryGroupConfiguration != null)
+ {
+ InetAddress groupAddress =
InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
+ InetAddress lbAddress;
+
+ if (discoveryGroupConfiguration.getLocalBindAddress() != null)
{
- InetAddress groupAddress =
InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
+ lbAddress =
InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
+ }
+ else
+ {
+ lbAddress = null;
+ }
- InetAddress lbAddress;
+ discoveryGroup =
+ new DiscoveryGroupImpl(nodeID,
+ discoveryGroupConfiguration.getName(),
+ lbAddress,
+ groupAddress,
+ discoveryGroupConfiguration.getGroupPort(),
+
discoveryGroupConfiguration.getRefreshTimeout());
- if (discoveryGroupConfiguration.getLocalBindAddress() != null)
- {
- lbAddress =
InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
- }
- else
- {
- lbAddress = null;
- }
+ discoveryGroup.registerListener(this);
+ discoveryGroup.start();
+ }
- discoveryGroup = new DiscoveryGroupImpl(nodeID,
-
discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
-
discoveryGroupConfiguration.getGroupPort(),
-
discoveryGroupConfiguration.getRefreshTimeout());
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
-
- readOnly = true;
- }
+ readOnly = true;
}
private ServerLocatorImpl(final boolean useHA,
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01
18:19:25 UTC (rev 10913)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-04
10:46:18 UTC (rev 10914)
@@ -79,7 +79,7 @@
private final Queue<OperationContext> pendingTokens = new
ConcurrentLinkedQueue<OperationContext>();
- private ExecutorFactory executorFactory;
+ private final ExecutorFactory executorFactory;
private SessionFailureListener failureListener;
@@ -89,7 +89,10 @@
// Constructors --------------------------------------------------
- public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory, final
ExecutorFactory executorFactory)
+ // XXX remove constructor once the other one is stable
+ @Deprecated
+ public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory,
+ final ExecutorFactory executorFactory)
{
super();
this.executorFactory = executorFactory;
@@ -101,8 +104,9 @@
/**
* @param remotingConnection
*/
- public ReplicationManagerImpl(CoreRemotingConnection remotingConnection)
+ public ReplicationManagerImpl(CoreRemotingConnection remotingConnection, final
ExecutorFactory executorFactory)
{
+ this.executorFactory = executorFactory;
replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01
18:19:25 UTC (rev 10913)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-04
10:46:18 UTC (rev 10914)
@@ -1954,11 +1954,10 @@
System.out.println(HornetQServerImpl.class.getName() + " " +
this.getIdentity() +
": create a ReplicationManagerImpl");
- replicationManager = new ReplicationManagerImpl(rc);
- System.out.println("rep.start()");
+ replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
- System.out.println("add RepMan to JournalStorageManager...");
journalStorageManager.setReplicator(replicationManager);
+ System.out.println("HornetQServerImpl: ReplicationManagerImpl is started &
added to JournalStorageManager...");
}
}