Author: timfox
Date: 2009-12-01 15:38:26 -0500 (Tue, 01 Dec 2009)
New Revision: 8484
Modified:
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
allow bridges to be deployed if clustered is false
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-12-01
20:18:47 UTC (rev 8483)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-12-01
20:38:26 UTC (rev 8484)
@@ -87,6 +87,8 @@
private boolean backup;
+ private final boolean clustered;
+
public ClusterManagerImpl(final org.hornetq.utils.ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -94,7 +96,8 @@
final ManagementService managementService,
final Configuration configuration,
final UUID nodeUUID,
- final boolean backup)
+ final boolean backup,
+ final boolean clustered)
{
if (nodeUUID == null)
{
@@ -116,6 +119,8 @@
this.nodeUUID = nodeUUID;
this.backup = backup;
+
+ this.clustered = clustered;
}
public synchronized void start() throws Exception
@@ -125,14 +130,22 @@
return;
}
- for (BroadcastGroupConfiguration config :
configuration.getBroadcastGroupConfigurations())
+ if (clustered)
{
- deployBroadcastGroup(config);
- }
+ for (BroadcastGroupConfiguration config :
configuration.getBroadcastGroupConfigurations())
+ {
+ deployBroadcastGroup(config);
+ }
- for (DiscoveryGroupConfiguration config :
configuration.getDiscoveryGroupConfigurations().values())
- {
- deployDiscoveryGroup(config);
+ for (DiscoveryGroupConfiguration config :
configuration.getDiscoveryGroupConfigurations().values())
+ {
+ deployDiscoveryGroup(config);
+ }
+
+ for (ClusterConnectionConfiguration config :
configuration.getClusterConfigurations())
+ {
+ deployClusterConnection(config);
+ }
}
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
@@ -140,11 +153,6 @@
deployBridge(config);
}
- for (ClusterConnectionConfiguration config :
configuration.getClusterConfigurations())
- {
- deployClusterConnection(config);
- }
-
started = true;
}
@@ -155,16 +163,29 @@
return;
}
- for (BroadcastGroup group : broadcastGroups.values())
+ if (clustered)
{
- group.stop();
- managementService.unregisterBroadcastGroup(group.getName());
- }
+ for (BroadcastGroup group : broadcastGroups.values())
+ {
+ group.stop();
+ managementService.unregisterBroadcastGroup(group.getName());
+ }
- for (DiscoveryGroup group : discoveryGroups.values())
- {
- group.stop();
- managementService.unregisterDiscoveryGroup(group.getName());
+ for (DiscoveryGroup group : discoveryGroups.values())
+ {
+ group.stop();
+ managementService.unregisterDiscoveryGroup(group.getName());
+ }
+
+ for (ClusterConnection clusterConnection : clusters.values())
+ {
+ clusterConnection.stop();
+ managementService.unregisterCluster(clusterConnection.getName().toString());
+ }
+
+ broadcastGroups.clear();
+
+ discoveryGroups.clear();
}
for (Bridge bridge : bridges.values())
@@ -173,16 +194,6 @@
managementService.unregisterBridge(bridge.getName().toString());
}
- for (ClusterConnection clusterConnection : clusters.values())
- {
- clusterConnection.stop();
- managementService.unregisterCluster(clusterConnection.getName().toString());
- }
-
- broadcastGroups.clear();
-
- discoveryGroups.clear();
-
bridges.clear();
started = false;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 20:18:47
UTC (rev 8483)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 20:38:26
UTC (rev 8484)
@@ -22,7 +22,6 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -53,10 +52,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
-import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.IOCompletion;
-import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.LogDelegateFactory;
@@ -102,8 +98,6 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.RoutingContext;
-import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
@@ -1160,20 +1154,18 @@
// Deply any pre-defined diverts
deployDiverts();
- if (configuration.isClustered())
- {
- // This can't be created until node id is set
- clusterManager = new ClusterManagerImpl(executorFactory,
- this,
- postOffice,
- scheduledPool,
- managementService,
- configuration,
- uuid,
- configuration.isBackup());
+ // This can't be created until node id is set
+ clusterManager = new ClusterManagerImpl(executorFactory,
+ this,
+ postOffice,
+ scheduledPool,
+ managementService,
+ configuration,
+ uuid,
+ configuration.isBackup(),
+ configuration.isClustered());
- clusterManager.start();
- }
+ clusterManager.start();
if (deploymentManager != null)
{
@@ -1198,7 +1190,7 @@
}
initialised = true;
-
+
log.info("********** initialised");
if (System.getProperty("org.hornetq.opt.routeblast") != null)
@@ -1493,47 +1485,45 @@
}
}
-
-// private void runRouteBlastNoWait() throws Exception
-// {
-// SimpleString address = new SimpleString("rbnw_address");
-// SimpleString queueName = new SimpleString("rbnw_name");
-//
-// createQueue(address, queueName, null, true, false, true);
-//
-// Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
-//
-// RBConsumer consumer = new RBConsumer(queue);
-//
-// queue.addConsumer(consumer);
-//
-// final int bodySize = 1024;
-//
-// byte[] body = new byte[bodySize];
-//
-// final int numMessages = 10000000;
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// final ServerMessage msg = new
ServerMessageImpl(storageManager.generateUniqueID(), 1500);
-//
-// msg.getBodyBuffer().writeBytes(body);
-//
-// msg.setDestination(address);
-//
-// msg.setDurable(true);
-//
-// postOffice.route(msg);
-// }
-// }
-
+ // private void runRouteBlastNoWait() throws Exception
+ // {
+ // SimpleString address = new SimpleString("rbnw_address");
+ // SimpleString queueName = new SimpleString("rbnw_name");
+ //
+ // createQueue(address, queueName, null, true, false, true);
+ //
+ // Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
+ //
+ // RBConsumer consumer = new RBConsumer(queue);
+ //
+ // queue.addConsumer(consumer);
+ //
+ // final int bodySize = 1024;
+ //
+ // byte[] body = new byte[bodySize];
+ //
+ // final int numMessages = 10000000;
+ //
+ // for (int i = 0; i < numMessages; i++)
+ // {
+ // final ServerMessage msg = new ServerMessageImpl(storageManager.generateUniqueID(),
1500);
+ //
+ // msg.getBodyBuffer().writeBytes(body);
+ //
+ // msg.setDestination(address);
+ //
+ // msg.setDurable(true);
+ //
+ // postOffice.route(msg);
+ // }
+ // }
+
private LinkedBlockingQueue<RouteBlastRunner> available = new
LinkedBlockingQueue<RouteBlastRunner>();
-
private void runRouteBlast() throws Exception
{
log.info("*** running route blast");
-
+
final int numThreads = 1;
final int numClients = 1000;
@@ -1565,7 +1555,7 @@
t.join();
}
}
-
+
class RouteBlastRunner implements Runnable
{
private SimpleString address;
@@ -1577,8 +1567,6 @@
this.address = address;
}
-
-
public void setup() throws Exception
{
final int numQueues = 1;
@@ -1595,7 +1583,7 @@
queue.addConsumer(consumer);
- //log.info("added consumer to queue " + queue);
+ // log.info("added consumer to queue " + queue);
consumers.add(consumer);
}
@@ -1639,10 +1627,7 @@
}
}
-
-
-
-
+
class Foo implements Runnable
{
public void run()
@@ -1662,7 +1647,7 @@
}
}
}
-
+
private class RBConsumer implements Consumer
{
private Queue queue;
@@ -1682,16 +1667,14 @@
reference.handled();
queue.acknowledge(reference);
-
- //log.info("acking");
+ // log.info("acking");
+
return HandleStatus.HANDLED;
}
}
-
-
// Inner classes
// --------------------------------------------------------------------------------
}