[hornetq-commits] JBoss hornetq SVN: r7986 - in branches/Branch_Replication_Changes: src/config/common/schema and 20 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Sep 24 05:25:46 EDT 2009
Author: timfox
Date: 2009-09-24 05:25:44 -0400 (Thu, 24 Sep 2009)
New Revision: 7986
Added:
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java
Removed:
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java
Modified:
branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml
branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd
branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml
branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
replication changes
Modified: branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml
===================================================================
--- branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/docs/user-manual/en/configuration-index.xml 2009-09-24 09:25:44 UTC (rev 7986)
@@ -325,14 +325,6 @@
<entry>true</entry>
</row>
<row>
- <entry><link linkend="queue.activation.timeout"
- >queue-activation-timeout</link></entry>
- <entry>Long</entry>
- <entry>after failover occurs, this timeout specifies how long (in ms) to
- wait for consumers to re-attach before starting delivery</entry>
- <entry>30000</entry>
- </row>
- <row>
<entry><link linkend="server.scheduled.thread.pool"
>scheduled-thread-pool-max-size</link></entry>
<entry>Integer</entry>
Modified: branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/config/common/schema/hornetq-jms.xsd 2009-09-24 09:25:44 UTC (rev 7986)
@@ -102,6 +102,9 @@
<xsd:element name="reconnect-attempts" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
+ <xsd:element name="use-reattach" type="xsd:boolean"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
<xsd:element name="failover-on-server-shutdown" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -861,12 +861,8 @@
return true;
}
- log.info("session handling failover");
-
boolean ok = false;
- log.info("Failover occurring");
-
// Need to stop all consumers outside the lock
for (ClientConsumerInternal consumer : consumers.values())
{
@@ -882,21 +878,13 @@
consumer.clearAtFailover();
}
- log.info("stopped consumers");
-
// We lock the channel to prevent any packets being sent during the failover process
channel.lock();
- log.info("got lock");
-
try
{
- log.info("transferring connection");
-
channel.transferConnection(backupConnection);
- log.info("transferred connection");
-
remotingConnection = backupConnection;
Packet request = new CreateSessionMessage(name,
@@ -913,15 +901,10 @@
Channel channel1 = backupConnection.getChannel(1, -1, false);
- log.info("sending create session");
-
CreateSessionResponseMessage response = (CreateSessionResponseMessage)channel1.sendBlocking(request);
- log.info("got response from create session");
-
if (response.isCreated())
{
- log.info("craeted ok");
// Session was created ok
// Now we need to recreate the consumers
@@ -959,7 +942,7 @@
conn.write(buffer, false);
}
}
-
+
if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
// Session is transacted - set for rollback only
@@ -990,18 +973,14 @@
}
ok = true;
-
- log.info("session created ok");
}
else
{
- log.info("not created ok");
// This means the server we failed onto is not ready to take new sessions - perhaps it hasn't actually
// failed over
}
// We cause any blocking calls to return - since they won't get responses.
- log.info("calling returnblocking");
channel.returnBlocking();
}
catch (Throwable t)
@@ -1012,7 +991,7 @@
{
channel.unlock();
}
-
+
return ok;
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -551,9 +551,9 @@
// until failover is complete
boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
-
+
boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0) && (failoverOnServerShutdown || !serverShutdown);
-
+
if (attemptFailoverOrReconnect)
{
lockAllChannel1s();
@@ -784,6 +784,8 @@
ok = false;
}
}
+
+ log.info("Reconnected ok");
}
return ok;
@@ -792,7 +794,7 @@
private RemotingConnection getConnectionWithRetry(final int initialRefCount, final int reconnectAttempts)
{
long interval = retryInterval;
-
+
int count = 0;
while (true)
@@ -1121,7 +1123,7 @@
public void run()
{
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
- "The connection was exitLoop by the server"));
+ "The connection was disconnected because of server shutdown"));
}
});
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -92,7 +92,7 @@
}
URL url = getClass().getClassLoader().getResource(configurationUrl);
- log.info("Loading server configuration from " + url);
+ log.debug("Loading server configuration from " + url);
Reader reader = new InputStreamReader(url.openStream());
String xml = org.hornetq.utils.XMLUtil.readerToString(reader);
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -125,7 +125,7 @@
try
{
- log.info("Deploying " + url + " for " + deployer.getClass().getSimpleName());
+ log.debug("Deploying " + url + " for " + deployer.getClass().getSimpleName());
deployer.deploy(url);
}
catch (Exception e)
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -32,7 +32,7 @@
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
-
+
private final Bridge bridge;
private final BridgeConfiguration configuration;
@@ -41,8 +41,7 @@
// Constructors --------------------------------------------------
- public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration)
- throws Exception
+ public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration) throws Exception
{
super(BridgeControl.class);
this.bridge = bridge;
@@ -54,10 +53,10 @@
public String[] getConnectorPair() throws Exception
{
String[] pair = new String[2];
-
+
pair[0] = configuration.getConnectorPair().a;
pair[1] = configuration.getConnectorPair().b != null ? configuration.getConnectorPair().b : null;
-
+
return pair;
}
@@ -70,7 +69,7 @@
{
return configuration.getQueueName();
}
-
+
public String getDiscoveryGroupName()
{
return configuration.getDiscoveryGroupName();
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -389,15 +389,12 @@
public String[] listRemoteAddresses()
{
- log.info("listing remote addresses");
Set<RemotingConnection> connections = remotingService.getConnections();
String[] remoteAddresses = new String[connections.size()];
int i = 0;
for (RemotingConnection connection : connections)
{
- log.info("connection " + connection + " is on server");
-
remoteAddresses[i++] = connection.getRemoteAddress();
}
return remoteAddresses;
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -169,16 +169,16 @@
}
public HornetQServerControlImpl registerServer(final PostOffice postOffice,
- final StorageManager storageManager,
- final Configuration configuration,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final HierarchicalRepository<Set<Role>> securityRepository,
- final ResourceManager resourceManager,
- final RemotingService remotingService,
- final HornetQServer messagingServer,
- final QueueFactory queueFactory,
- final ScheduledExecutorService scheduledThreadPool,
- final boolean backup) throws Exception
+ final StorageManager storageManager,
+ final Configuration configuration,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final HierarchicalRepository<Set<Role>> securityRepository,
+ final ResourceManager resourceManager,
+ final RemotingService remotingService,
+ final HornetQServer messagingServer,
+ final QueueFactory queueFactory,
+ final ScheduledExecutorService scheduledThreadPool,
+ final boolean backup) throws Exception
{
this.postOffice = postOffice;
this.addressSettingsRepository = addressSettingsRepository;
@@ -191,12 +191,12 @@
messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
messagingServerControl = new HornetQServerControlImpl(postOffice,
- configuration,
- resourceManager,
- remotingService,
- messagingServer,
- messageCounterManager,
- broadcaster);
+ configuration,
+ resourceManager,
+ remotingService,
+ messagingServer,
+ messageCounterManager,
+ broadcaster);
ObjectName objectName = ObjectNames.getHornetQServerObjectName();
registerInJMX(objectName, messagingServerControl);
registerInRegistry(ResourceNames.CORE_SERVER, messagingServerControl);
@@ -298,7 +298,7 @@
public void unregisterAcceptors()
{
- List<String> acceptors = new ArrayList<String>();
+ List<String> acceptors = new ArrayList<String>();
for (String resourceName : registry.keySet())
{
if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
@@ -306,7 +306,7 @@
acceptors.add(resourceName);
}
}
-
+
for (String acceptor : acceptors)
{
String name = acceptor.substring(ResourceNames.CORE_ACCEPTOR.length());
@@ -320,7 +320,7 @@
}
}
}
-
+
public synchronized void unregisterAcceptor(final String name) throws Exception
{
ObjectName objectName = ObjectNames.getAcceptorObjectName(name);
@@ -478,7 +478,7 @@
}
private Set<ObjectName> registeredNames = new HashSet<ObjectName>();
-
+
public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception
{
if (!jmxManagementEnabled)
@@ -595,14 +595,15 @@
}
if (!unexpectedResourceNames.isEmpty())
{
- log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() + " unexpected registered MBeans");
+ log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() +
+ " unexpected registered MBeans");
}
for (ObjectName on : this.registeredNames)
{
try
{
- mbeanServer.unregisterMBean(on);
+ mbeanServer.unregisterMBean(on);
}
catch (Exception ignore)
{
@@ -611,16 +612,19 @@
}
}
- messageCounterManager.stop();
+ if (messageCounterManager != null)
+ {
+ messageCounterManager.stop();
- messageCounterManager.resetAllCounters();
+ messageCounterManager.resetAllCounters();
- messageCounterManager.resetAllCounterHistories();
+ messageCounterManager.resetAllCounterHistories();
- messageCounterManager.clear();
-
+ messageCounterManager.clear();
+ }
+
registeredNames.clear();
-
+
started = false;
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -403,7 +403,7 @@
}
for (Bindable bindable : chosen)
- {
+ {
bindable.route(message, tx);
}
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -85,8 +85,6 @@
private volatile boolean started;
- // private volatile boolean backup;
-
private final ManagementService managementService;
private final Reaper reaperRunnable = new Reaper();
@@ -127,7 +125,7 @@
final ManagementService managementService,
final long reaperPeriod,
final int reaperPriority,
- final boolean enableWildCardRouting,
+ final boolean enableWildCardRouting,
final int idCacheSize,
final boolean persistIDCache,
final ExecutorFactory orderedExecutorFactory,
@@ -182,7 +180,7 @@
// This is to avoid thread leakages where the Reaper would run beyong the life cycle of the PostOffice
started = true;
- startExpiryScanner();
+ startExpiryScanner();
}
public synchronized void stop() throws Exception
@@ -344,8 +342,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay,
- redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
}
}
}
@@ -415,8 +412,7 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay,
- redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
}
}
}
@@ -870,7 +866,7 @@
log.warn("Reaper thread being restarted");
closed = false;
}
-
+
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
while (PostOfficeImpl.this.isStarted())
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -63,7 +63,11 @@
Version getVersion();
HornetQServerControlImpl getHornetQServerControl();
+
+ void registerActivateCallback(ActivateCallback callback);
+ void unregisterActivateCallback(ActivateCallback callback);
+
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
CreateSessionResponseMessage createSession(String name,
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -241,7 +241,7 @@
}
private void cancelRefs() throws Exception
- {
+ {
MessageReference ref;
LinkedList<MessageReference> list = new LinkedList<MessageReference>();
@@ -251,10 +251,17 @@
list.addFirst(ref);
}
+ Queue queue = null;
for (MessageReference ref2 : list)
{
- ref2.getQueue().cancel(ref2);
+ queue = ref2.getQueue();
+ queue.cancel(ref2);
}
+
+ if (queue != null)
+ {
+ queue.deliverAsync(executor);
+ }
}
public void stop() throws Exception
@@ -495,10 +502,116 @@
{
if (started)
{
- executor.execute(new FailRunnable());
+ //executor.execute(new FailRunnable());
+
+ try
+ {
+ cancelRefs();
+
+ //setupNotificationConsumer();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle failure", e);
+ }
}
}
+
+ private ClientConsumer notifConsumer;
+
+ // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
+ // connection is opened and closed - we can use
+ // a callback to tell us that
+ private void setupNotificationConsumer() throws Exception
+ {
+ if (flowRecord != null)
+ {
+ if (notifConsumer != null)
+ {
+ try
+ {
+ notifConsumer.close();
+
+ notifConsumer = null;
+ }
+ catch (HornetQException e)
+ {
+ log.error("Failed to close consumer", e);
+ }
+ }
+
+ // Get the queue data
+ // Create a queue to catch the notifications - the name must be deterministic on live and backup, but
+ // different each time this is called
+ // Otherwise it may already exist if server is restarted before it has been deleted on backup
+
+ String qName = "notif." + nodeUUID.toString() + "." + name.toString();
+
+ SimpleString notifQueueName = new SimpleString(qName);
+
+ SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
+ BindingType.DIVERT.toInt() +
+ " AND " +
+ ManagementHelper.HDR_NOTIFICATION_TYPE +
+ " IN ('" +
+ NotificationType.BINDING_ADDED +
+ "','" +
+ NotificationType.BINDING_REMOVED +
+ "','" +
+ NotificationType.CONSUMER_CREATED +
+ "','" +
+ NotificationType.CONSUMER_CLOSED +
+ "') AND " +
+ ManagementHelper.HDR_DISTANCE +
+ "<" +
+ flowRecord.getMaxHops() +
+ " AND (" +
+ ManagementHelper.HDR_ADDRESS +
+ " LIKE '" +
+ flowRecord.getAddress() +
+ "%')");
+
+ // The queue can't be temporary, since if the node with the bridge crashes then is restarted quickly
+ // it might get deleted on the target when it does connection cleanup
+
+ // When the backup activates the queue might already exist, so we catch this and ignore
+ try
+ {
+ session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
+ }
+ catch (HornetQException me)
+ {
+ if (me.getCode() == HornetQException.QUEUE_EXISTS)
+ {
+ // Ok
+ }
+ else
+ {
+ throw me;
+ }
+ }
+
+ notifConsumer = session.createConsumer(notifQueueName);
+
+ notifConsumer.setMessageHandler(flowRecord);
+
+ session.start();
+
+ ClientMessage message = session.createClientMessage(false);
+
+ ManagementHelper.putOperationInvocation(message,
+ ResourceNames.CORE_SERVER,
+ "sendQueueInfoToQueue",
+ notifQueueName.toString(),
+ flowRecord.getAddress());
+
+ ClientProducer prod = session.createProducer(managementAddress);
+
+ prod.send(message);
+ }
+ }
+
private synchronized boolean createObjects()
{
if (!started)
@@ -507,9 +620,7 @@
}
try
- {
- queue.addConsumer(BridgeImpl.this);
-
+ {
csf = null;
if (discoveryAddress != null)
{
@@ -540,92 +651,22 @@
session.setSendAcknowledgementHandler(BridgeImpl.this);
- // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
- // connection is opened and closed - we can use
- // a callback to tell us that
- if (flowRecord != null)
- {
- // Get the queue data
-
- // Create a queue to catch the notifications - the name must be deterministic on live and backup, but
- // different each time this is called
- // Otherwise it may already exist if server is restarted before it has been deleted on backup
-
- String qName = "notif." + nodeUUID.toString() + "." + name.toString();
-
- SimpleString notifQueueName = new SimpleString(qName);
-
- SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
- BindingType.DIVERT.toInt() +
- " AND " +
- ManagementHelper.HDR_NOTIFICATION_TYPE +
- " IN ('" +
- NotificationType.BINDING_ADDED +
- "','" +
- NotificationType.BINDING_REMOVED +
- "','" +
- NotificationType.CONSUMER_CREATED +
- "','" +
- NotificationType.CONSUMER_CLOSED +
- "') AND " +
- ManagementHelper.HDR_DISTANCE +
- "<" +
- flowRecord.getMaxHops() +
- " AND (" +
- ManagementHelper.HDR_ADDRESS +
- " LIKE '" +
- flowRecord.getAddress() +
- "%')");
-
- // The queue can't be temporary, since if the node with the bridge crashes then is restarted quickly
- // it might get deleted on the target when it does connection cleanup
-
- // When the backup activates the queue might already exist, so we catch this and ignore
- try
- {
- session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
- }
- catch (HornetQException me)
- {
- if (me.getCode() == HornetQException.QUEUE_EXISTS)
- {
- // Ok
- }
- else
- {
- throw me;
- }
- }
-
- ClientConsumer notifConsumer = session.createConsumer(notifQueueName);
-
- notifConsumer.setMessageHandler(flowRecord);
-
- session.start();
-
- ClientMessage message = session.createClientMessage(false);
-
- ManagementHelper.putOperationInvocation(message,
- ResourceNames.CORE_SERVER,
- "sendQueueInfoToQueue",
- notifQueueName.toString(),
- flowRecord.getAddress());
-
- ClientProducer prod = session.createProducer(managementAddress);
-
- prod.send(message);
- }
-
+ setupNotificationConsumer();
+
active = true;
+
+ queue.addConsumer(BridgeImpl.this);
queue.deliverAsync(executor);
+
+ log.info("Bridge " + name + " is now connected to destination ");
return true;
}
catch (Exception e)
{
- log.warn("Unable to connect. Bridge is now disabled.", e);
-
+ log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.");
+
return false;
}
}
@@ -667,54 +708,56 @@
}
}
- private class FailRunnable implements Runnable
- {
- public void run()
- {
- synchronized (BridgeImpl.this)
- {
- if (!started)
- {
- return;
- }
+// private class FailRunnable implements Runnable
+// {
+// public void run()
+// {
+// synchronized (BridgeImpl.this)
+// {
+//
+// if (!started)
+// {
+// return;
+// }
+//
+// if (flowRecord != null)
+// {
+// try
+// {
+// // flowRecord.reset();
+// }
+// catch (Exception e)
+// {
+// log.error("Failed to reset", e);
+// }
+// }
+//
+// active = false;
+// }
+//
+// try
+// {
+// queue.removeConsumer(BridgeImpl.this);
+//
+// session.cleanUp();
+//
+// cancelRefs();
+//
+// csf.close();
+// }
+// catch (Exception e)
+// {
+// log.error("Failed to stop", e);
+// }
+//
+// if (!createObjects())
+// {
+// started = false;
+// }
+// }
+// }
+// }
- if (flowRecord != null)
- {
- try
- {
- // flowRecord.reset();
- }
- catch (Exception e)
- {
- log.error("Failed to reset", e);
- }
- }
-
- active = false;
- }
-
- try
- {
- queue.removeConsumer(BridgeImpl.this);
-
- session.cleanUp();
-
- cancelRefs();
-
- csf.close();
- }
- catch (Exception e)
- {
- log.error("Failed to stop", e);
- }
-
- if (!createObjects())
- {
- started = false;
- }
- }
- }
-
private class CreateObjectsRunnable implements Runnable
{
public synchronized void run()
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Distributor;
@@ -23,6 +24,8 @@
*/
public abstract class DistributorImpl implements Distributor
{
+ private static final Logger log = Logger.getLogger(DistributorImpl.class);
+
protected final List<Consumer> consumers = new ArrayList<Consumer>();
public void addConsumer(Consumer consumer)
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -75,6 +75,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.security.impl.SecurityStoreImpl;
+import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MemoryManager;
@@ -179,17 +180,19 @@
private Deployer securityDeployer;
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
-
+
private final Object initialiseLock = new Object();
private boolean initialised;
-
+
private int managementConnectorID;
private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
-
+
private ConnectionManager replicatingConnectionManager;
+ private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -273,6 +276,8 @@
// so it can be initialised by the live node
remotingService.start();
+ started = true;
+
log.info("HornetQ Server version " + getVersion().getFullVersion() + " started");
}
@@ -334,16 +339,26 @@
managementService.stop();
- storageManager.stop();
+ if (storageManager != null)
+ {
+ storageManager.stop();
+ }
if (securityManager != null)
{
securityManager.stop();
}
-
- resourceManager.stop();
- postOffice.stop();
+ if (resourceManager != null)
+ {
+ resourceManager.stop();
+ }
+
+ if (postOffice != null)
+ {
+ postOffice.stop();
+ }
+
// Need to shutdown pools before shutting down paging manager to make sure everything is written ok
List<Runnable> tasks = scheduledPool.shutdownNow();
@@ -369,7 +384,10 @@
scheduledPool = null;
threadPool = null;
- pagingManager.stop();
+ if (pagingManager != null)
+ {
+ pagingManager.stop();
+ }
memoryManager.stop();
@@ -390,7 +408,7 @@
initialised = false;
uuid = null;
nodeID = null;
-
+
log.info("HornetQ Server version " + getVersion().getFullVersion() + " stopped");
}
@@ -487,7 +505,7 @@
}
public CreateSessionResponseMessage createSession(final String name,
- final long channelID,
+ final long channelID,
final String username,
final String password,
final int minLargeMessageSize,
@@ -498,7 +516,7 @@
final boolean preAcknowledge,
final boolean xa,
final int sendWindowSize) throws Exception
- {
+ {
if (version.getIncrementingVersion() != incrementingVersion)
{
log.warn("Client with version " + incrementingVersion +
@@ -511,11 +529,11 @@
"interoperate properly");
return null;
}
-
+
if (!checkActivate())
{
- //Backup server is not ready to accept connections
-
+ // Backup server is not ready to accept connections
+
return new CreateSessionResponseMessage(false, version.getIncrementingVersion());
}
@@ -532,7 +550,7 @@
Channel channel = connection.getChannel(channelID, sendWindowSize, false);
- final ServerSessionImpl session = new ServerSessionImpl(name,
+ final ServerSessionImpl session = new ServerSessionImpl(name,
username,
password,
minLargeMessageSize,
@@ -552,7 +570,7 @@
queueFactory,
this,
configuration.getManagementAddress());
-
+
sessions.put(name, session);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
@@ -602,120 +620,119 @@
}
}
-// public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
-// {
-// if (theUUID == null)
-// {
-// throw new IllegalArgumentException("node id is null");
-// }
-//
-// synchronized (initialiseLock)
-// {
-// if (initialised)
-// {
-// throw new IllegalStateException("Server is already initialised");
-// }
-//
-// this.uuid = theUUID;
-//
-// this.nodeID = new SimpleString(uuid.toString());
-//
-// initialisePart2();
-//
-// long backupID = storageManager.getCurrentUniqueID();
-//
-// if (liveUniqueID != backupID)
-// {
-// initialised = false;
-//
-// throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
-// ":" +
-// backupID +
-// "). You're probably trying to restart a live backup pair after a crash");
-// }
-//
-// log.info("Backup server is now operational");
-// }
-// }
-
-// private boolean setupReplicatingConnection() throws Exception
-// {
-// String backupConnectorName = configuration.getBackupConnectorName();
-//
-// if (backupConnectorName != null)
-// {
-// TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
-//
-// if (backupConnector == null)
-// {
-// log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
-// }
-// else
-// {
-// replicatingConnectionManager = new ConnectionManagerImpl(null,
-// backupConnector,
-// null,
-// false,
-// 1,
-// ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
-// ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-// ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
-// 0,
-// 1.0d,
-// 0,
-// threadPool,
-// scheduledPool);
-//
-// replicatingConnection = replicatingConnectionManager.getConnection(1);
-//
-// if (replicatingConnection != null)
-// {
-// replicatingChannel = replicatingConnection.getChannel(2, -1, false);
-//
-// replicatingConnection.addFailureListener(new FailureListener()
-// {
-// public void connectionFailed(HornetQException me)
-// {
-// replicatingChannel.executeOutstandingDelayedResults();
-// }
-// });
-//
-// // First time we get channel we send a message down it informing the backup of our node id -
-// // backup and live must have the same node id
-//
-// Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
-//
-// final Future future = new Future();
-//
-// replicatingChannel.replicatePacket(packet, 1, new Runnable()
-// {
-// public void run()
-// {
-// future.run();
-// }
-// });
-//
-// // This may take a while especially if the journal is large
-// boolean ok = future.await(60000);
-//
-// if (!ok)
-// {
-// throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
-// }
-// }
-// else
-// {
-// log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
-//
-// return false;
-// }
-// }
-// }
-//
-// return true;
-// }
-
-
+ // public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
+ // {
+ // if (theUUID == null)
+ // {
+ // throw new IllegalArgumentException("node id is null");
+ // }
+ //
+ // synchronized (initialiseLock)
+ // {
+ // if (initialised)
+ // {
+ // throw new IllegalStateException("Server is already initialised");
+ // }
+ //
+ // this.uuid = theUUID;
+ //
+ // this.nodeID = new SimpleString(uuid.toString());
+ //
+ // initialisePart2();
+ //
+ // long backupID = storageManager.getCurrentUniqueID();
+ //
+ // if (liveUniqueID != backupID)
+ // {
+ // initialised = false;
+ //
+ // throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
+ // ":" +
+ // backupID +
+ // "). You're probably trying to restart a live backup pair after a crash");
+ // }
+ //
+ // log.info("Backup server is now operational");
+ // }
+ // }
+
+ // private boolean setupReplicatingConnection() throws Exception
+ // {
+ // String backupConnectorName = configuration.getBackupConnectorName();
+ //
+ // if (backupConnectorName != null)
+ // {
+ // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+ //
+ // if (backupConnector == null)
+ // {
+ // log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
+ // }
+ // else
+ // {
+ // replicatingConnectionManager = new ConnectionManagerImpl(null,
+ // backupConnector,
+ // null,
+ // false,
+ // 1,
+ // ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ // ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ // ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ // 0,
+ // 1.0d,
+ // 0,
+ // threadPool,
+ // scheduledPool);
+ //
+ // replicatingConnection = replicatingConnectionManager.getConnection(1);
+ //
+ // if (replicatingConnection != null)
+ // {
+ // replicatingChannel = replicatingConnection.getChannel(2, -1, false);
+ //
+ // replicatingConnection.addFailureListener(new FailureListener()
+ // {
+ // public void connectionFailed(HornetQException me)
+ // {
+ // replicatingChannel.executeOutstandingDelayedResults();
+ // }
+ // });
+ //
+ // // First time we get channel we send a message down it informing the backup of our node id -
+ // // backup and live must have the same node id
+ //
+ // Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+ //
+ // final Future future = new Future();
+ //
+ // replicatingChannel.replicatePacket(packet, 1, new Runnable()
+ // {
+ // public void run()
+ // {
+ // future.run();
+ // }
+ // });
+ //
+ // // This may take a while especially if the journal is large
+ // boolean ok = future.await(60000);
+ //
+ // if (!ok)
+ // {
+ // throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
+ // }
+ // }
+ // else
+ // {
+ // log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
+ //
+ // return false;
+ // }
+ // }
+ // }
+ //
+ // return true;
+ // }
+
public HornetQServerControlImpl getHornetQServerControl()
{
return messagingServerControl;
@@ -827,6 +844,16 @@
postOffice.removeBinding(queueName);
}
+ public synchronized void registerActivateCallback(final ActivateCallback callback)
+ {
+ activateCallbacks.add(callback);
+ }
+
+ public synchronized void unregisterActivateCallback(final ActivateCallback callback)
+ {
+ activateCallbacks.remove(callback);
+ }
+
public ExecutorFactory getExecutorFactory()
{
return executorFactory;
@@ -852,26 +879,37 @@
// Private
// --------------------------------------------------------------------------------------
+ private synchronized void callActivateCallbacks()
+ {
+ for (ActivateCallback callback : activateCallbacks)
+ {
+ callback.activated();
+ }
+ }
+
private synchronized boolean checkActivate() throws Exception
- {
+ {
if (configuration.isBackup())
{
- //Handle backup server activation
-
+ // Handle backup server activation
+
if (configuration.isSharedStore())
{
- //Complete the startup procedure
-
+ // Complete the startup procedure
+
+ log.info("Activating server");
+
configuration.setBackup(false);
-
- initialisePart2();
+
+ initialisePart2();
}
else
{
- //just load journal
+ // TODO
+ // just load journal
}
}
-
+
return true;
}
@@ -951,7 +989,7 @@
managementService,
configuration.getMessageExpiryScanPeriod(),
configuration.getMessageExpiryThreadPriority(),
- configuration.isWildcardRoutingEnabled(),
+ configuration.isWildcardRoutingEnabled(),
configuration.getIDCacheSize(),
configuration.isPersistIDCache(),
executorFactory,
@@ -1024,6 +1062,10 @@
queueDeployer.start();
}
+ // We need to call this here, this gives any dependent server a chance to deploy its own destinations
+ // this needs to be done before clustering is initialised
+ callActivateCallbacks();
+
// Deply any pre-defined diverts
deployDiverts();
@@ -1036,7 +1078,7 @@
scheduledPool,
managementService,
configuration,
- uuid,
+ uuid,
configuration.isBackup());
clusterManager.start();
@@ -1050,9 +1092,9 @@
pagingManager.resumeDepages();
final ServerInfo dumper = new ServerInfo(this, pagingManager);
-
+
long dumpInfoInterval = configuration.getServerDumpInterval();
-
+
if (dumpInfoInterval > 0)
{
scheduledPool.scheduleWithFixedDelay(new Runnable()
@@ -1063,10 +1105,8 @@
}
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}
-
+
initialised = true;
-
- started = true;
}
private void deployQueuesFromConfiguration() throws Exception
@@ -1079,7 +1119,7 @@
config.isDurable());
}
}
-
+
private void loadJournal() throws Exception
{
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
@@ -1294,5 +1334,5 @@
}
// Inner classes
- // --------------------------------------------------------------------------------
+ // --------------------------------------------------------------------------------
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1260,6 +1260,7 @@
promptDelivery = false;
return;
}
+
continue;
}
else
@@ -1307,7 +1308,7 @@
}
HandleStatus status = handle(reference, consumer);
-
+
if (status == HandleStatus.HANDLED)
{
if (iterator == null)
@@ -1335,6 +1336,7 @@
{
groups.remove(consumer);
}
+
continue;
}
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1377,11 +1377,19 @@
}
public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
- {
- try
+ {
+ ServerConsumer consumer = consumers.get(packet.getConsumerID());
+
+ if (consumer == null)
{
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ log.error("There is no consumer with id " + packet.getConsumerID());
+ return;
}
+
+ try
+ {
+ consumer.receiveCredits(packet.getCredits());
+ }
catch (Exception e)
{
log.error("Failed to receive credits " + this.server.getConfiguration().isBackup(), e);
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -654,7 +654,17 @@
{
sessionFactory.setReconnectAttempts(reconnectAttempts);
}
+
+ public synchronized boolean isUseReattach()
+ {
+ return sessionFactory.isUseReattach();
+ }
+ public synchronized void setUseReattach(boolean reattach)
+ {
+ sessionFactory.setUseReattach(reattach);
+ }
+
public synchronized boolean isFailoverOnServerShutdown()
{
return sessionFactory.isFailoverOnServerShutdown();
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -133,7 +133,7 @@
{
return;
}
-
+
if (!contextSet)
{
context = new InitialContext();
@@ -141,6 +141,8 @@
deploymentManager = new FileDeploymentManager(server.getConfiguration().getFileDeployerScanPeriod());
+ server.registerActivateCallback(this);
+
server.start();
started = true;
Deleted: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,178 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.management.impl;
-
-import java.util.List;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.management.ConnectionFactoryControl;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class ConnectionFactoryControlImpl implements ConnectionFactoryControl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final HornetQConnectionFactory cf;
-
- private final List<String> bindings;
-
- private final String name;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ConnectionFactoryControlImpl(final HornetQConnectionFactory cf, final String name, final List<String> bindings) throws NotCompliantMBeanException
- {
- this.cf = cf;
- this.name = name;
- this.bindings = bindings;
- }
-
- // Public --------------------------------------------------------
-
- // ManagedConnectionFactoryMBean implementation ------------------
-
- public List<String> getBindings()
- {
- return bindings;
- }
-
- public String getClientID()
- {
- return cf.getClientID();
- }
-
- public long getClientFailureCheckPeriod()
- {
- return cf.getClientFailureCheckPeriod();
- }
-
- public long getCallTimeout()
- {
- return cf.getCallTimeout();
- }
-
- public int getConsumerMaxRate()
- {
- return cf.getConsumerMaxRate();
- }
-
- public int getConsumerWindowSize()
- {
- return cf.getConsumerWindowSize();
- }
-
- public int getProducerMaxRate()
- {
- return cf.getProducerMaxRate();
- }
-
- public int getProducerWindowSize()
- {
- return cf.getProducerWindowSize();
- }
-
- public int getDupsOKBatchSize()
- {
- return cf.getDupsOKBatchSize();
- }
-
- public boolean isBlockOnAcknowledge()
- {
- return cf.isBlockOnAcknowledge();
- }
-
- public boolean isBlockOnNonPersistentSend()
- {
- return cf.isBlockOnNonPersistentSend();
- }
-
- public boolean isBlockOnPersistentSend()
- {
- return cf.isBlockOnPersistentSend();
- }
-
- public boolean isPreAcknowledge()
- {
- return cf.isPreAcknowledge();
- }
-
- public String getName()
- {
- return name;
- }
-
- public long getConnectionTTL()
- {
- return cf.getConnectionTTL();
- }
-
- public int getMaxConnections()
- {
- return cf.getMaxConnections();
- }
-
- public int getReconnectAttempts()
- {
- return cf.getReconnectAttempts();
- }
-
- public boolean isFailoverOnNodeShutdown()
- {
- return cf.isFailoverOnServerShutdown();
- }
-
- public long getMinLargeMessageSize()
- {
- return cf.getMinLargeMessageSize();
- }
-
- public long getRetryInterval()
- {
- return cf.getRetryInterval();
- }
-
- public double getRetryIntervalMultiplier()
- {
- return cf.getRetryIntervalMultiplier();
- }
-
- public long getTransactionBatchSize()
- {
- return cf.getTransactionBatchSize();
- }
-
- public boolean isAutoGroup()
- {
- return cf.isAutoGroup();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Copied: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java (from rev 7946, branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/ConnectionFactoryControlImpl.java)
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java (rev 0)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.management.impl;
+
+import java.util.List;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.management.ConnectionFactoryControl;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class JMSConnectionFactoryControlImpl extends StandardMBean implements ConnectionFactoryControl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final HornetQConnectionFactory cf;
+
+ private final List<String> bindings;
+
+ private final String name;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public JMSConnectionFactoryControlImpl(final HornetQConnectionFactory cf, final String name, final List<String> bindings) throws NotCompliantMBeanException
+ {
+ super(ConnectionFactoryControl.class);
+ this.cf = cf;
+ this.name = name;
+ this.bindings = bindings;
+ }
+
+ // Public --------------------------------------------------------
+
+ // ManagedConnectionFactoryMBean implementation ------------------
+
+ public List<String> getBindings()
+ {
+ return bindings;
+ }
+
+ public String getClientID()
+ {
+ return cf.getClientID();
+ }
+
+ public long getClientFailureCheckPeriod()
+ {
+ return cf.getClientFailureCheckPeriod();
+ }
+
+ public long getCallTimeout()
+ {
+ return cf.getCallTimeout();
+ }
+
+ public int getConsumerMaxRate()
+ {
+ return cf.getConsumerMaxRate();
+ }
+
+ public int getConsumerWindowSize()
+ {
+ return cf.getConsumerWindowSize();
+ }
+
+ public int getProducerMaxRate()
+ {
+ return cf.getProducerMaxRate();
+ }
+
+ public int getProducerWindowSize()
+ {
+ return cf.getProducerWindowSize();
+ }
+
+ public int getDupsOKBatchSize()
+ {
+ return cf.getDupsOKBatchSize();
+ }
+
+ public boolean isBlockOnAcknowledge()
+ {
+ return cf.isBlockOnAcknowledge();
+ }
+
+ public boolean isBlockOnNonPersistentSend()
+ {
+ return cf.isBlockOnNonPersistentSend();
+ }
+
+ public boolean isBlockOnPersistentSend()
+ {
+ return cf.isBlockOnPersistentSend();
+ }
+
+ public boolean isPreAcknowledge()
+ {
+ return cf.isPreAcknowledge();
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public long getConnectionTTL()
+ {
+ return cf.getConnectionTTL();
+ }
+
+ public int getMaxConnections()
+ {
+ return cf.getMaxConnections();
+ }
+
+ public int getReconnectAttempts()
+ {
+ return cf.getReconnectAttempts();
+ }
+
+ public boolean isFailoverOnNodeShutdown()
+ {
+ return cf.isFailoverOnServerShutdown();
+ }
+
+ public long getMinLargeMessageSize()
+ {
+ return cf.getMinLargeMessageSize();
+ }
+
+ public long getRetryInterval()
+ {
+ return cf.getRetryInterval();
+ }
+
+ public double getRetryIntervalMultiplier()
+ {
+ return cf.getRetryIntervalMultiplier();
+ }
+
+ public long getTransactionBatchSize()
+ {
+ return cf.getTransactionBatchSize();
+ }
+
+ public boolean isAutoGroup()
+ {
+ return cf.isAutoGroup();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -106,7 +106,7 @@
{
ObjectName objectName = ObjectNames.getJMSTopicObjectName(topic.getTopicName());
AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
- TopicControlImpl control = new TopicControlImpl(topic, addressControl, jndiBinding, managementService);
+ JMSTopicControlImpl control = new JMSTopicControlImpl(topic, addressControl, jndiBinding, managementService);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control);
}
@@ -123,7 +123,7 @@
final List<String> bindings) throws Exception
{
ObjectName objectName = ObjectNames.getConnectionFactoryObjectName(name);
- ConnectionFactoryControlImpl control = new ConnectionFactoryControlImpl(connectionFactory, name, bindings);
+ JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name, bindings);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name, control);
}
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -15,6 +15,8 @@
import java.util.Map;
+import javax.management.StandardMBean;
+
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.MessageCounterInfo;
@@ -34,7 +36,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class JMSQueueControlImpl implements JMSQueueControl
+public class JMSQueueControlImpl extends StandardMBean implements JMSQueueControl
{
// Constants -----------------------------------------------------
@@ -57,7 +59,8 @@
*/
public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
{
- return (selectorStr == null || selectorStr.trim().length() == 0) ? null : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+ return (selectorStr == null || selectorStr.trim().length() == 0) ? null
+ : SelectorTranslator.convertToHornetQFilterString(selectorStr);
}
private static String createFilterForJMSMessageID(String jmsMessageID) throws Exception
@@ -71,7 +74,7 @@
for (int i = 0; i < messages.length; i++)
{
Map<String, Object> message = messages[i];
- array.put(new JSONObject(message));
+ array.put(new JSONObject(message));
}
return array.toString();
}
@@ -79,10 +82,11 @@
// Constructors --------------------------------------------------
public JMSQueueControlImpl(final HornetQQueue managedQueue,
- final QueueControl coreQueueControl,
- final String jndiBinding,
- final MessageCounter counter)
+ final QueueControl coreQueueControl,
+ final String jndiBinding,
+ final MessageCounter counter) throws Exception
{
+ super(JMSQueueControl.class);
this.managedQueue = managedQueue;
this.coreQueueControl = coreQueueControl;
this.binding = jndiBinding;
@@ -187,10 +191,10 @@
String filter = createFilterFromJMSSelector(filterStr);
Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
- Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
-
+ Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
int i = 0;
-
+
for (Map<String, Object> coreMessage : coreMessages)
{
Map<String, Object> jmsMessage = HornetQMessage.coreMaptoJMSMap(coreMessage);
@@ -203,7 +207,7 @@
throw new IllegalStateException(e.getMessage());
}
}
-
+
public String listMessagesAsJSON(String filter) throws Exception
{
return toJSON(listMessages(filter));
Modified: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -25,6 +25,7 @@
import javax.management.NotificationEmitter;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
+import javax.management.StandardMBean;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.config.TransportConfiguration;
@@ -39,7 +40,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class JMSServerControlImpl implements JMSServerControl, NotificationEmitter
+public class JMSServerControlImpl extends StandardMBean implements JMSServerControl, NotificationEmitter
{
// Constants -----------------------------------------------------
@@ -132,8 +133,9 @@
// Constructors --------------------------------------------------
- public JMSServerControlImpl(final JMSServerManager server)
+ public JMSServerControlImpl(final JMSServerManager server) throws Exception
{
+ super(JMSServerControl.class);
this.server = server;
broadcaster = new NotificationBroadcasterSupport();
}
Copied: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java (from rev 7946, branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java)
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java (rev 0)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.management.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.StandardMBean;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.AddressControl;
+import org.hornetq.core.management.HornetQServerControl;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.management.QueueControl;
+import org.hornetq.core.management.ResourceNames;
+import org.hornetq.jms.HornetQTopic;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.SelectorTranslator;
+import org.hornetq.jms.server.management.TopicControl;
+import org.hornetq.utils.Pair;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class JMSTopicControlImpl extends StandardMBean implements TopicControl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(JMSTopicControlImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final HornetQTopic managedTopic;
+
+ private final String binding;
+
+ private AddressControl addressControl;
+
+ private ManagementService managementService;
+
+ // Static --------------------------------------------------------
+
+ public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
+ {
+ return (selectorStr == null || selectorStr.trim().length() == 0) ? null
+ : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+ }
+
+ // Constructors --------------------------------------------------
+
+ public JMSTopicControlImpl(final HornetQTopic topic,
+ final AddressControl addressControl,
+ final String jndiBinding,
+ final ManagementService managementService) throws Exception
+ {
+ super(TopicControl.class);
+ this.managedTopic = topic;
+ this.addressControl = addressControl;
+ this.binding = jndiBinding;
+ this.managementService = managementService;
+ }
+
+ // TopicControlMBean implementation ------------------------------
+
+ public String getName()
+ {
+ return managedTopic.getName();
+ }
+
+ public boolean isTemporary()
+ {
+ return managedTopic.isTemporary();
+ }
+
+ public String getAddress()
+ {
+ return managedTopic.getAddress();
+ }
+
+ public String getJNDIBinding()
+ {
+ return binding;
+ }
+
+ public int getMessageCount()
+ {
+ return getMessageCount(DurabilityType.ALL);
+ }
+
+ public int getDurableMessageCount()
+ {
+ return getMessageCount(DurabilityType.DURABLE);
+ }
+
+ public int getNonDurableMessageCount()
+ {
+ return getMessageCount(DurabilityType.NON_DURABLE);
+ }
+
+ public int getSubscriptionCount()
+ {
+ return getQueues(DurabilityType.ALL).size();
+ }
+
+ public int getDurableSubscriptionCount()
+ {
+ return getQueues(DurabilityType.DURABLE).size();
+ }
+
+ public int getNonDurableSubscriptionCount()
+ {
+ return getQueues(DurabilityType.NON_DURABLE).size();
+ }
+
+ public Object[] listAllSubscriptions()
+ {
+ return listSubscribersInfos(DurabilityType.ALL);
+ }
+
+ public String listAllSubscriptionsAsJSON() throws Exception
+ {
+ return listSubscribersInfosAsJSON(DurabilityType.ALL);
+ }
+
+ public Object[] listDurableSubscriptions()
+ {
+ return listSubscribersInfos(DurabilityType.DURABLE);
+ }
+
+ public String listDurableSubscriptionsAsJSON() throws Exception
+ {
+ return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
+ }
+
+ public Object[] listNonDurableSubscriptions()
+ {
+ return listSubscribersInfos(DurabilityType.NON_DURABLE);
+ }
+
+ public String listNonDurableSubscriptionsAsJSON() throws Exception
+ {
+ return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
+ }
+
+ public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception
+ {
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+ if (coreQueueControl == null)
+ {
+ throw new IllegalArgumentException("No subscriptions with name " + queueName);
+ }
+
+ Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
+
+ Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
+ int i = 0;
+
+ for (Map<String, Object> coreMessage : coreMessages)
+ {
+ jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage);
+ }
+ return jmsMessages;
+ }
+
+ public String listMessagesForSubscriptionAsJSON(String queueName) throws Exception
+ {
+ return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
+ }
+
+ public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+ {
+ String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+ if (coreQueueControl == null)
+ {
+ throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+ }
+ String filter = createFilterFromJMSSelector(filterStr);
+ return coreQueueControl.listMessages(filter).length;
+ }
+
+ public int removeMessages(String filterStr) throws Exception
+ {
+ String filter = createFilterFromJMSSelector(filterStr);
+ int count = 0;
+ String[] queues = addressControl.getQueueNames();
+ for (String queue : queues)
+ {
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+ count += coreQueueControl.removeMessages(filter);
+ }
+
+ return count;
+ }
+
+ public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
+ {
+ String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+ if (coreQueueControl == null)
+ {
+ throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+ }
+ HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+ serverControl.destroyQueue(queueName);
+ }
+
+ public void dropAllSubscriptions() throws Exception
+ {
+ HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+ String[] queues = addressControl.getQueueNames();
+ for (String queue : queues)
+ {
+ serverControl.destroyQueue(queue);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private Object[] listSubscribersInfos(final DurabilityType durability)
+ {
+ List<QueueControl> queues = getQueues(durability);
+ List<Object[]> subInfos = new ArrayList<Object[]>(queues.size());
+
+ for (QueueControl queue : queues)
+ {
+ String clientID = null;
+ String subName = null;
+
+ if (queue.isDurable())
+ {
+ Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+ .toString());
+ clientID = pair.a;
+ subName = pair.b;
+ }
+
+ String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+ Object[] subscriptionInfo = new Object[6];
+ subscriptionInfo[0] = queue.getName();
+ subscriptionInfo[1] = clientID;
+ subscriptionInfo[2] = subName;
+ subscriptionInfo[3] = queue.isDurable();
+ subscriptionInfo[4] = queue.getMessageCount();
+
+ subInfos.add(subscriptionInfo);
+ }
+ return subInfos.toArray(new Object[subInfos.size()]);
+ }
+
+ private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
+ {
+ List<QueueControl> queues = getQueues(durability);
+ JSONArray array = new JSONArray();
+
+ for (QueueControl queue : queues)
+ {
+ String clientID = null;
+ String subName = null;
+
+ if (queue.isDurable())
+ {
+ Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName()
+ .toString());
+ clientID = pair.a;
+ subName = pair.b;
+ }
+
+ String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+ JSONObject info = new JSONObject();
+ info.put("queueName", queue.getName());
+ info.put("clientID", clientID);
+ info.put("selector", filter);
+ info.put("name", subName);
+ info.put("durable", queue.isDurable());
+ info.put("messageCount", queue.getMessageCount());
+ array.put(info);
+ }
+
+ return array.toString();
+ }
+
+ private int getMessageCount(final DurabilityType durability)
+ {
+ List<QueueControl> queues = getQueues(durability);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getMessageCount();
+ }
+ return count;
+ }
+
+ private List<QueueControl> getQueues(final DurabilityType durability)
+ {
+ try
+ {
+ List<QueueControl> matchingQueues = new ArrayList<QueueControl>();
+ String[] queues = addressControl.getQueueNames();
+ for (String queue : queues)
+ {
+ QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+
+ // Ignore the "special" subscription
+ if (!coreQueueControl.getName().equals(addressControl.getAddress()))
+ {
+ if (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && coreQueueControl.isDurable()) ||
+ (durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()))
+ {
+ matchingQueues.add(coreQueueControl);
+ }
+ }
+ }
+ return matchingQueues;
+ }
+ catch (Exception e)
+ {
+ return Collections.emptyList();
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private enum DurabilityType
+ {
+ ALL, DURABLE, NON_DURABLE
+ }
+}
Deleted: branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java
===================================================================
--- branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/src/main/org/hornetq/jms/server/management/impl/TopicControlImpl.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,348 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.management.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.AddressControl;
-import org.hornetq.core.management.HornetQServerControl;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.QueueControl;
-import org.hornetq.core.management.ResourceNames;
-import org.hornetq.jms.HornetQTopic;
-import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.jms.client.SelectorTranslator;
-import org.hornetq.jms.server.management.TopicControl;
-import org.hornetq.utils.Pair;
-import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class TopicControlImpl implements TopicControl
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(TopicControlImpl.class);
-
- // Attributes ----------------------------------------------------
-
- private final HornetQTopic managedTopic;
-
- private final String binding;
-
- private AddressControl addressControl;
-
- private ManagementService managementService;
-
- // Static --------------------------------------------------------
-
- public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
- {
- return (selectorStr == null || selectorStr.trim().length() == 0) ? null : SelectorTranslator.convertToHornetQFilterString(selectorStr);
- }
-
- // Constructors --------------------------------------------------
-
- public TopicControlImpl(final HornetQTopic topic,
- final AddressControl addressControl,
- final String jndiBinding,
- final ManagementService managementService)
- {
- this.managedTopic = topic;
- this.addressControl = addressControl;
- this.binding = jndiBinding;
- this.managementService = managementService;
- }
-
- // TopicControlMBean implementation ------------------------------
-
- public String getName()
- {
- return managedTopic.getName();
- }
-
- public boolean isTemporary()
- {
- return managedTopic.isTemporary();
- }
-
- public String getAddress()
- {
- return managedTopic.getAddress();
- }
-
- public String getJNDIBinding()
- {
- return binding;
- }
-
- public int getMessageCount()
- {
- return getMessageCount(DurabilityType.ALL);
- }
-
- public int getDurableMessageCount()
- {
- return getMessageCount(DurabilityType.DURABLE);
- }
-
- public int getNonDurableMessageCount()
- {
- return getMessageCount(DurabilityType.NON_DURABLE);
- }
-
- public int getSubscriptionCount()
- {
- return getQueues(DurabilityType.ALL).size();
- }
-
- public int getDurableSubscriptionCount()
- {
- return getQueues(DurabilityType.DURABLE).size();
- }
-
- public int getNonDurableSubscriptionCount()
- {
- return getQueues(DurabilityType.NON_DURABLE).size();
- }
-
- public Object[] listAllSubscriptions()
- {
- return listSubscribersInfos(DurabilityType.ALL);
- }
-
- public String listAllSubscriptionsAsJSON() throws Exception
- {
- return listSubscribersInfosAsJSON(DurabilityType.ALL);
- }
-
- public Object[] listDurableSubscriptions()
- {
- return listSubscribersInfos(DurabilityType.DURABLE);
- }
-
- public String listDurableSubscriptionsAsJSON() throws Exception
- {
- return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
- }
-
- public Object[] listNonDurableSubscriptions()
- {
- return listSubscribersInfos(DurabilityType.NON_DURABLE);
- }
-
- public String listNonDurableSubscriptionsAsJSON() throws Exception
- {
- return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
- }
-
- public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception
- {
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null)
- {
- throw new IllegalArgumentException("No subscriptions with name " + queueName);
- }
-
- Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
-
- Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
-
- int i = 0;
-
- for (Map<String, Object> coreMessage : coreMessages)
- {
- jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage);
- }
- return jmsMessages;
- }
-
- public String listMessagesForSubscriptionAsJSON(String queueName) throws Exception
- {
- return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
- }
-
- public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
- {
- String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null)
- {
- throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
- }
- String filter = createFilterFromJMSSelector(filterStr);
- return coreQueueControl.listMessages(filter).length;
- }
-
- public int removeMessages(String filterStr) throws Exception
- {
- String filter = createFilterFromJMSSelector(filterStr);
- int count = 0;
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues)
- {
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
- count += coreQueueControl.removeMessages(filter);
- }
-
- return count;
- }
-
- public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
- {
- String queueName = HornetQTopic.createQueueNameForDurableSubscription(clientID, subscriptionName);
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
- if (coreQueueControl == null)
- {
- throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
- }
- HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
- serverControl.destroyQueue(queueName);
- }
-
- public void dropAllSubscriptions() throws Exception
- {
- HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues)
- {
- serverControl.destroyQueue(queue);
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private Object[] listSubscribersInfos(final DurabilityType durability)
- {
- List<QueueControl> queues = getQueues(durability);
- List<Object[]> subInfos = new ArrayList<Object[]>(queues.size());
-
- for (QueueControl queue : queues)
- {
- String clientID = null;
- String subName = null;
-
- if (queue.isDurable())
- {
- Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName().toString());
- clientID = pair.a;
- subName = pair.b;
- }
-
- String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
- Object[] subscriptionInfo = new Object[6];
- subscriptionInfo[0] = queue.getName();
- subscriptionInfo[1] = clientID;
- subscriptionInfo[2] = subName;
- subscriptionInfo[3] = queue.isDurable();
- subscriptionInfo[4] = queue.getMessageCount();
-
- subInfos.add(subscriptionInfo);
- }
- return subInfos.toArray(new Object[subInfos.size()]);
- }
-
- private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
- {
- List<QueueControl> queues = getQueues(durability);
- JSONArray array = new JSONArray();
-
- for (QueueControl queue : queues)
- {
- String clientID = null;
- String subName = null;
-
- if (queue.isDurable())
- {
- Pair<String, String> pair = HornetQTopic.decomposeQueueNameForDurableSubscription(queue.getName().toString());
- clientID = pair.a;
- subName = pair.b;
- }
-
- String filter = queue.getFilter() != null ? queue.getFilter() : null;
-
- JSONObject info = new JSONObject();
- info.put("queueName", queue.getName());
- info.put("clientID", clientID);
- info.put("selector", filter);
- info.put("name", subName);
- info.put("durable", queue.isDurable());
- info.put("messageCount", queue.getMessageCount());
- array.put(info);
- }
-
- return array.toString();
- }
-
- private int getMessageCount(final DurabilityType durability)
- {
- List<QueueControl> queues = getQueues(durability);
- int count = 0;
- for (QueueControl queue : queues)
- {
- count += queue.getMessageCount();
- }
- return count;
- }
-
- private List<QueueControl> getQueues(final DurabilityType durability)
- {
- try
- {
- List<QueueControl> matchingQueues = new ArrayList<QueueControl>();
- String[] queues = addressControl.getQueueNames();
- for (String queue : queues)
- {
- QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
-
- // Ignore the "special" subscription
- if (!coreQueueControl.getName().equals(addressControl.getAddress()))
- {
- if (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && coreQueueControl.isDurable()) ||
- (durability == DurabilityType.NON_DURABLE && !coreQueueControl.isDurable()))
- {
- matchingQueues.add(coreQueueControl);
- }
- }
- }
- return matchingQueues;
- }
- catch (Exception e)
- {
- return Collections.emptyList();
- }
- }
-
- // Inner classes -------------------------------------------------
-
- private enum DurabilityType
- {
- ALL, DURABLE, NON_DURABLE
- }
-}
Modified: branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/config/ConfigurationTest-full-config.xml 2009-09-24 09:25:44 UTC (rev 7986)
@@ -25,8 +25,7 @@
<message-expiry-scan-period>10111213</message-expiry-scan-period>
<message-expiry-thread-priority>8</message-expiry-thread-priority>
<id-cache-size>127</id-cache-size>
- <persist-id-cache>true</persist-id-cache>
- <queue-activation-timeout>12456</queue-activation-timeout>
+ <persist-id-cache>true</persist-id-cache>
<backup>true</backup>
<shared-store>true</shared-store>
<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>
Modified: branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -120,7 +120,9 @@
protected void tearDown() throws Exception
{
super.tearDown();
+
getJmsServerManager().destroyConnectionFactory("testsuitecf");
+
cf = null;
assertRemainingMessages(0);
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -31,6 +31,7 @@
import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -49,6 +50,8 @@
*/
public class SessionFactoryTest extends ServiceTestBase
{
+ private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
+
private final String groupAddress = "230.1.2.3";
private final int groupPort = 8765;
@@ -62,14 +65,14 @@
private TransportConfiguration backupTC;
protected void tearDown() throws Exception
- {
+ {
if (liveService != null && liveService.isStarted())
- {
+ {
liveService.stop();
- }
+ }
if (backupService != null && backupService.isStarted())
- {
- liveService.stop();
+ {
+ backupService.stop();
}
liveService = null;
backupService = null;
@@ -106,7 +109,7 @@
{
try
{
- startLiveAndBackup();
+ startLiveAndBackup();
ClientSessionFactory cf = new ClientSessionFactoryImpl();
assertFactoryParams(cf,
null,
@@ -136,7 +139,7 @@
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
- ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+ ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
try
{
ClientSession session = cf.createSession(false, true, true);
@@ -144,8 +147,9 @@
}
catch (HornetQException e)
{
+ e.printStackTrace();
// Ok
- }
+ }
final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(this.liveTC,
this.backupTC);
@@ -854,10 +858,12 @@
{
if (liveService.isStarted())
{
+ log.info("stopping live");
liveService.stop();
}
if (backupService.isStarted())
{
+ log.info("stopping backup");
backupService.stop();
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -50,8 +50,7 @@
{
private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
-
- //Fail bridge and reconnecting immediately
+ // Fail bridge and reconnecting immediately
public void testFailoverAndReconnectImmediately() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -94,16 +93,16 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 1;
-
+
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), server2tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
- retryIntervalMultiplier,
+ retryIntervalMultiplier,
reconnectAttempts,
true,
false,
@@ -151,7 +150,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session0.createClientMessage(false);
+ ClientMessage message = session0.createClientMessage(true);
message.putIntProperty(propKey, i);
prod0.send(message);
@@ -175,7 +174,7 @@
assertEquals(0, server1.getRemotingService().getConnections().size());
assertEquals(0, service2.getRemotingService().getConnections().size());
}
-
+
// Fail bridge and attempt failover a few times before succeeding
public void testFailoverAndReconnectAfterAFewTries() throws Exception
{
@@ -225,8 +224,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -272,7 +271,7 @@
InVMConnector.numberOfFailures = reconnectAttempts - 1;
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- forwardingConnection = getForwardingConnection(bridge);
+ forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
final int numMessages = 10;
@@ -305,7 +304,7 @@
assertEquals(0, server1.getRemotingService().getConnections().size());
assertEquals(0, service2.getRemotingService().getConnections().size());
}
-
+
// Fail bridge and reconnect same node, no backup specified
public void testReconnectSameNode() throws Exception
{
@@ -344,8 +343,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -389,7 +388,7 @@
InVMConnector.numberOfFailures = reconnectAttempts - 1;
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- forwardingConnection = getForwardingConnection(bridge);
+ forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
final int numMessages = 10;
@@ -420,7 +419,7 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -458,8 +457,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -491,7 +490,7 @@
server1.stop();
server1.start();
-
+
ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
ClientSession session1 = csf1.createSession(false, true, true);
@@ -527,7 +526,7 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
public void testFailoverThenFailAgainAndReconnect() throws Exception
{
Map<String, Object> server0Params = new HashMap<String, Object>();
@@ -564,8 +563,8 @@
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
queueName0,
forwardAddress,
- null,
null,
+ null,
retryInterval,
retryIntervalMultiplier,
reconnectAttempts,
@@ -626,13 +625,13 @@
assertNotNull(r1);
assertEquals(i, r1.getProperty(propKey));
}
-
- //Fail again - should reconnect
+
+ // Fail again - should reconnect
forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = reconnectAttempts - 1;
forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session0.createClientMessage(false);
@@ -657,24 +656,24 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
private RemotingConnection getForwardingConnection(final Bridge bridge) throws Exception
{
long start = System.currentTimeMillis();
-
+
do
{
RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
-
+
if (forwardingConnection != null)
{
return forwardingConnection;
}
-
+
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < 50000);
-
+
throw new IllegalStateException("Failed to get forwarding connection");
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -224,11 +224,11 @@
forwardAddress,
null,
null,
- 1000,
+ 500,
1d,
-1,
- false,
true,
+ true,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -322,7 +322,11 @@
sf1.close();
+ log.info("stopping server 1");
+
server1.stop();
+
+ log.info("stopped server 1");
for (int i = 0; i < numMessages; i++)
{
@@ -332,8 +336,12 @@
producer0.send(message);
}
+
+ log.info("sent some more messages");
server1.start();
+
+ log.info("started server1");
sf1 = new ClientSessionFactoryImpl(server1tc);
@@ -342,6 +350,8 @@
consumer1 = session1.createConsumer(queueName1);
session1.start();
+
+ log.info("started session");
for (int i = 0; i < numMessages; i++)
{
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.cluster.bridge;
-import java.util.HashMap;
import java.util.Map;
import org.hornetq.core.config.Configuration;
@@ -35,29 +34,6 @@
*/
public abstract class BridgeTestBase extends UnitTestCase
{
- protected HornetQServer createHornetQServerNIO(final int id, final Map<String, Object> params)
- {
- return createHornetQServerNIO(id, params, false);
- }
-
- protected HornetQServer createHornetQServerNIO(final int id,
- final Map<String, Object> params,
- final boolean backup)
- {
- Configuration serviceConf = new ConfigurationImpl();
- serviceConf.setClustered(true);
- serviceConf.setSecurityEnabled(false);
- serviceConf.setBackup(backup);
- serviceConf.setJournalMinFiles(2);
- serviceConf.setJournalFileSize(100 * 1024);
- params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
- serviceConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
- params));
- HornetQServer service = HornetQ.newHornetQServer(serviceConf);
- return service;
- }
-
protected HornetQServer createHornetQServer(final int id, final Map<String, Object> params)
{
return createHornetQServer(id, params, false);
@@ -69,16 +45,18 @@
serviceConf.setClustered(true);
serviceConf.setSecurityEnabled(false);
serviceConf.setBackup(backup);
+ serviceConf.setSharedStore(true);
+ serviceConf.setBindingsDirectory(getBindingsDir(id, false));
+ serviceConf.setJournalMinFiles(2);
+ serviceConf.setJournalDirectory(getJournalDir(id, false));
+ serviceConf.setPagingDirectory(getPageDir(id, false));
+ serviceConf.setLargeMessagesDirectory(getLargeMessagesDir(id, false));
+
params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
serviceConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
- params));
- HornetQServer service = HornetQ.newHornetQServer(serviceConf, false);
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
+ HornetQServer service = HornetQ.newHornetQServer(serviceConf, true);
return service;
}
- protected HornetQServer createHornetQServer(final int id)
- {
- return this.createHornetQServer(id, new HashMap<String, Object>());
- }
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1421,8 +1421,13 @@
closeSessionFactory(3);
stopServers(0, 3);
+ log.info("stopped servers");
startServers(3, 0);
+
+ log.info("restarted servers");
+
+ Thread.sleep(2000);
setupSessionFactory(0, isNetty());
setupSessionFactory(3, isNetty());
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -32,15 +32,12 @@
{
if (packet.getType() == PacketImpl.SESS_SEND)
{
- try
- {
- Thread.sleep(2000);
- }
- catch (Exception e)
- {
- }
+ //Lose the send
+ return false;
}
-
- return true;
+ else
+ {
+ return true;
+ }
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -18,6 +18,7 @@
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
/**
* A DelayInterceptor2
@@ -32,16 +33,15 @@
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
- try
+ if (packet.getType() == PacketImpl.NULL_RESPONSE)
{
- Thread.sleep(2000);
+ //Lose the response from the commit
+
+ return false;
}
- catch (InterruptedException e)
+ else
{
+ return true;
}
-
- log.info("proceeding");
-
- return true;
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor3.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -35,16 +35,12 @@
{
if (packet.getType() == PacketImpl.SESS_COMMIT)
{
- log.info("got sess commit, delaying");
- try
- {
- Thread.sleep(2000);
- }
- catch (Exception e)
- {
- }
+ //lose the commit
+ return false;
}
-
- return false;
+ else
+ {
+ return true;
+ }
}
}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -472,8 +472,6 @@
RemotingConnection conn = ((ClientSessionInternal)session2).getConnection();
- log.info("Failing connection**");
-
// Simulate failure on connection
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
@@ -483,16 +481,12 @@
assertTrue(ok);
- log.info("** creating the consumer");
-
consumer = session2.createConsumer(ADDRESS);
for (int i = numMessages / 2; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
- log.info("got message " + message);
-
assertNotNull(message);
assertEquals("message" + i, message.getBody().readString());
@@ -1173,9 +1167,7 @@
assertTrue(ok);
- log.info("closing session");
session.close();
- log.info("closed session");
sf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(false));
@@ -1477,8 +1469,6 @@
assertTrue(ok);
- log.info("after failover");
-
for (int i = 0; i < numMessages; i++)
{
// Only the persistent messages will survive
@@ -1487,8 +1477,6 @@
{
ClientMessage message = consumer.receive(1000);
- log.info("got message " + i);
-
assertNotNull(message);
assertEquals("message" + i, message.getBody().readString());
@@ -1788,9 +1776,7 @@
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
- log.info("creating session");
final ClientSession session = sf.createSession(false, false);
- log.info("created session");
session.createQueue(ADDRESS, ADDRESS, null, true);
@@ -1840,23 +1826,18 @@
sf.addInterceptor(interceptor);
session.commit();
-
- log.info("Initial commit succeeded");
}
catch (HornetQException e)
{
if (e.getCode() == HornetQException.UNBLOCKED)
{
- log.info("commit unblocked");
-
// Ok - now we retry the commit after removing the interceptor
sf.removeInterceptor(interceptor);
try
{
- log.info("retrying commit");
- session.commit();
+ session.commit();
}
catch (HornetQException e2)
{
@@ -1958,9 +1939,7 @@
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
- log.info("creating session");
final ClientSession session = sf.createSession(false, false);
- log.info("created session");
session.createQueue(ADDRESS, ADDRESS, null, true);
@@ -2002,15 +1981,11 @@
server0Service.getRemotingService().addInterceptor(interceptor);
session.commit();
-
- log.info("Initial commit succeeded");
}
catch (HornetQException e)
{
if (e.getCode() == HornetQException.UNBLOCKED)
{
- log.info("commit unblocked");
-
// Ok - now we retry the commit after removing the interceptor
server0Service.getRemotingService().removeInterceptor(interceptor);
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -17,6 +17,7 @@
import java.util.Map;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -79,12 +80,12 @@
public void testAutomaticFailover() throws Exception
{
HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"),
- new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
-
+ new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
jbcf.setBlockOnPersistentSend(true);
jbcf.setBlockOnNonPersistentSend(true);
-
+
Connection conn = jbcf.createConnection();
MyExceptionListener listener = new MyExceptionListener();
@@ -99,7 +100,7 @@
SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
- coreSession.createQueue(jmsQueueName, jmsQueueName, null, false);
+ coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
Queue queue = sess.createQueue("myqueue");
@@ -107,6 +108,8 @@
MessageProducer producer = sess.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
MessageConsumer consumer = sess.createConsumer(queue);
for (int i = 0; i < numMessages; i++)
@@ -137,19 +140,20 @@
conn.close();
- assertNull(listener.e);
+ assertNotNull(listener.e);
+
+ assertTrue(me == listener.e.getCause());
}
public void testManualFailover() throws Exception
{
HornetQConnectionFactory jbcfLive = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
jbcfLive.setBlockOnNonPersistentSend(true);
jbcfLive.setBlockOnPersistentSend(true);
-
HornetQConnectionFactory jbcfBackup = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
+ backupParams));
jbcfBackup.setBlockOnNonPersistentSend(true);
jbcfBackup.setBlockOnPersistentSend(true);
@@ -167,7 +171,7 @@
SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
- coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, false);
+ coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, true);
Queue queue = sessLive.createQueue("myqueue");
@@ -200,11 +204,8 @@
Connection connBackup = jbcfBackup.createConnection();
- log.info("creating session on backup");
Session sessBackup = connBackup.createSession(false, Session.AUTO_ACKNOWLEDGE);
- log.info("created on backup");
-
MessageConsumer consumerBackup = sessBackup.createConsumer(queue);
connBackup.start();
@@ -238,24 +239,29 @@
backupConf.setSecurityEnabled(false);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
- backupParams));
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", backupParams));
backupConf.setBackup(true);
- backupService = HornetQ.newHornetQServer(backupConf, false);
+ backupConf.setSharedStore(true);
+ backupConf.setBindingsDirectory(getBindingsDir());
+ backupConf.setJournalMinFiles(2);
+ backupConf.setJournalDirectory(getJournalDir());
+ backupConf.setPagingDirectory(getPageDir());
+ backupConf.setLargeMessagesDirectory(getLargeMessagesDir());
+ backupService = HornetQ.newHornetQServer(backupConf, true);
backupService.start();
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration backupTC = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams,
- "backup-connector");
- connectors.put(backupTC.getName(), backupTC);
- liveConf.setConnectorConfigurations(connectors);
- liveConf.setBackupConnectorName(backupTC.getName());
- liveService = HornetQ.newHornetQServer(liveConf, false);
+ liveConf.setSharedStore(true);
+ liveConf.setBindingsDirectory(getBindingsDir());
+ liveConf.setJournalMinFiles(2);
+ liveConf.setJournalDirectory(getJournalDir());
+ liveConf.setPagingDirectory(getPageDir());
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir());
+
+ liveService = HornetQ.newHornetQServer(liveConf, true);
liveService.start();
}
@@ -269,11 +275,11 @@
assertEquals(0, InVMRegistry.instance.size());
liveService = null;
-
+
backupService = null;
-
+
backupParams = null;
-
+
super.tearDown();
}
Deleted: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopWithReplicationTest.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -1,241 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.jms.server;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.FileConfiguration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.impl.HornetQSecurityManagerImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.integration.transports.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- *
- * A JMSServerStartStopWithReplicationTest
- *
- * Make sure live backup pair can be stopped and started ok multiple times with predefined queues etc
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class JMSServerStartStopWithReplicationTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(JMSServerStartStopWithReplicationTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final Map<String, Object> backupParams = new HashMap<String, Object>();
-
- private JMSServerManager liveJMSServer;
-
- private JMSServerManager backupJMSServer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testStopStartBackupBeforeLive() throws Exception
- {
- testStopStart1(true);
- }
-
- public void testStopStartLiveBeforeBackup() throws Exception
- {
- testStopStart1(false);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- this.liveJMSServer = null;
- this.backupJMSServer = null;
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- private void testStopStart1(final boolean backupBeforeLive) throws Exception
- {
- final int numMessages = 5;
-
- for (int j = 0; j < numMessages; j++)
- {
- log.info("Iteration " + j);
-
- startBackup();
- startLive();
-
- HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName()),
- new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(),
- backupParams));
-
- jbcf.setBlockOnPersistentSend(true);
- jbcf.setBlockOnNonPersistentSend(true);
-
- Connection conn = jbcf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = sess.createQueue("myJMSQueue");
-
- MessageProducer producer = sess.createProducer(queue);
-
- TextMessage tm = sess.createTextMessage("message" + j);
-
- producer.send(tm);
-
- conn.close();
-
- jbcf.close();
-
- if (backupBeforeLive)
- {
- stopBackup();
- stopLive();
- }
- else
- {
- stopLive();
- stopBackup();
- }
- }
-
- startBackup();
- startLive();
-
- HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName()),
- new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(),
- backupParams));
-
- jbcf.setBlockOnPersistentSend(true);
- jbcf.setBlockOnNonPersistentSend(true);
-
- Connection conn = jbcf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = sess.createQueue("myJMSQueue");
-
- MessageConsumer consumer = sess.createConsumer(queue);
-
- conn.start();
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage tm = (TextMessage)consumer.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- conn.close();
-
- jbcf.close();
-
- if (backupBeforeLive)
- {
- stopBackup();
- stopLive();
- }
- else
- {
- stopLive();
- stopBackup();
- }
- }
-
- private void stopLive() throws Exception
- {
- liveJMSServer.stop();
- }
-
- private void stopBackup() throws Exception
- {
- backupJMSServer.stop();
- }
-
- private void startLive() throws Exception
- {
- FileConfiguration fcLive = new FileConfiguration();
-
- fcLive.setConfigurationUrl("server-start-stop-live-config1.xml");
-
- fcLive.start();
-
- HornetQSecurityManager smLive = new HornetQSecurityManagerImpl();
-
- HornetQServer liveServer = new HornetQServerImpl(fcLive, smLive);
-
- liveJMSServer = new JMSServerManagerImpl(liveServer, "server-start-stop-live-jms-config1.xml");
-
- liveJMSServer.setContext(null);
-
- liveJMSServer.start();
- }
-
- private void startBackup() throws Exception
- {
- FileConfiguration fcBackup = new FileConfiguration();
-
- fcBackup.setConfigurationUrl("server-start-stop-backup-config1.xml");
-
- fcBackup.start();
-
- HornetQSecurityManager smBackup = new HornetQSecurityManagerImpl();
-
- HornetQServer liveServer = new HornetQServerImpl(fcBackup, smBackup);
-
- backupJMSServer = new JMSServerManagerImpl(liveServer, "server-start-stop-backup-jms-config1.xml");
-
- backupJMSServer.setContext(null);
-
- backupJMSServer.start();
- }
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-09-23 21:01:19 UTC (rev 7985)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-09-24 09:25:44 UTC (rev 7986)
@@ -270,7 +270,7 @@
}
}
- protected static Object checkBinding(Context context, String binding) throws Exception
+ protected static Object checkBinding(Context context, String binding) throws Exception
{
Object o = context.lookup(binding);
assertNotNull(o);
More information about the hornetq-commits
mailing list