[jboss-cvs] JBoss Messaging SVN: r5326 - in trunk: src/main/org/jboss/messaging/core/client and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 10 10:30:53 EST 2008
Author: timfox
Date: 2008-11-10 10:30:53 -0500 (Mon, 10 Nov 2008)
New Revision: 5326
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
Modified:
trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/config/ConfigurationTest-config.xml
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
Simplified send management message
Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.example;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -57,9 +59,10 @@
// create a management message to subscribe to notifications from the
// server
+ ClientProducer producer = clientSession.createProducer(DEFAULT_MANAGEMENT_ADDRESS);
ClientMessage mngmntMessage = clientSession.createClientMessage(false);
ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, true);
- clientSession.sendManagementMessage(mngmntMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
System.out.println("send message to subscribe to notifications");
ClientConsumer mngmntConsumer = clientSession.createConsumer(replytoQueue);
@@ -117,7 +120,7 @@
ManagementServiceImpl.getMessagingServerObjectName(),
"setMessageCounterSamplePeriod",
(long)30000);
- clientSession.sendManagementMessage(mngmntMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
System.out.println("sent management message to set an attribute");
// create a message to retrieve one or many attributes
@@ -128,7 +131,7 @@
"MessageCount",
"Durable");
- clientSession.sendManagementMessage(mngmntMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
System.out.println("sent management message to retrieve attributes");
// create a message to invoke the operation sendMessageToDLQ(long) on the
@@ -139,14 +142,14 @@
ManagementServiceImpl.getQueueObjectName(queue, queue),
"sendMessageToDLQ",
(long)6161);
- clientSession.sendManagementMessage(mngmntMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
System.out.println("sent management message to invoke operation");
// create a message to unsubscribe from the notifications sent by the
// server
mngmntMessage = clientSession.createClientMessage(false);
ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, false);
- clientSession.sendManagementMessage(mngmntMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
System.out.println("send message to unsubscribe to notifications");
Thread.sleep(5000);
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -116,6 +116,4 @@
boolean removeFailureListener(FailureListener listener);
int getVersion();
-
- void sendManagementMessage(ClientMessage message);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -63,7 +63,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
@@ -745,16 +744,6 @@
// Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
remotingConnection.addFailureListener(this);
}
-
- public void sendManagementMessage(final ClientMessage message)
- {
- Packet packet = new SessionSendManagementMessage(message);
-
- //Fill in the dest - it's not actually used
- message.setDestination(new SimpleString("JBM"));
-
- channel.send(packet);
- }
// XAResource implementation
// --------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -63,8 +63,6 @@
public static final SimpleString HDR_JMX_NOTIFICATION = new SimpleString("JBMJMXNotification");
- public static final SimpleString MANAGEMENT_DESTINATION = new SimpleString("admin.AdminDestination");
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -27,6 +27,7 @@
import java.util.Set;
import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -87,6 +88,10 @@
TransportConfiguration getBackupConnectorConfiguration();
void setBackupConnectorConfiguration(TransportConfiguration config);
+
+ SimpleString getManagementAddress();
+
+ void setManagementAddress(SimpleString address);
// Journal related attributes
// ------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -20,6 +20,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
/**
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
@@ -80,6 +81,8 @@
public static final long DEFAULT_TRANSACTION_TIMEOUT = 60000;
public static final long DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD = 1000;
+
+ public static final SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("admin.management");
// Attributes -----------------------------------------------------------------------------
@@ -144,6 +147,8 @@
protected long transactionTimeout = DEFAULT_TRANSACTION_TIMEOUT;
protected long transactionTimeoutScanPeriod = DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
+
+ protected SimpleString managementAddress = DEFAULT_MANAGEMENT_ADDRESS;
public boolean isClustered()
{
@@ -424,6 +429,16 @@
{
return messageCounterEnabled;
}
+
+ public SimpleString getManagementAddress()
+ {
+ return managementAddress;
+ }
+
+ public void setManagementAddress(SimpleString address)
+ {
+ this.managementAddress = address;
+ }
@Override
public boolean equals(final Object other)
@@ -454,7 +469,8 @@
cother.getJournalMinFiles() == getJournalMinFiles() &&
cother.getJournalType() == getJournalType() &&
cother.getScheduledThreadPoolMaxSize() == getScheduledThreadPoolMaxSize() &&
- cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval();
+ cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval() &&
+ cother.getManagementAddress().equals(getManagementAddress());
}
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.XMLUtil;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
@@ -91,6 +92,8 @@
transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout);
transactionTimeoutScanPeriod = getLong(e, "transaction-timeout-scan-period", transactionTimeoutScanPeriod);
+
+ managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -40,7 +40,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
@@ -125,7 +124,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -788,11 +786,6 @@
packet = new NullResponseMessage();
break;
}
- case SESS_MANAGEMENT_SEND:
- {
- packet = new SessionSendManagementMessage();
- break;
- }
case SESS_REPLICATE_DELIVERY:
{
packet = new SessionReplicateDeliveryMessage();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -146,12 +146,10 @@
public static final byte SESS_RECEIVE_MSG = 79;
- public static final byte SESS_MANAGEMENT_SEND = 80;
+ public static final byte SESS_FAILOVER_COMPLETE = 80;
- public static final byte SESS_FAILOVER_COMPLETE = 81;
+ public static final byte SESS_REPLICATE_DELIVERY = 81;
- public static final byte SESS_REPLICATE_DELIVERY = 82;
-
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -1,106 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionSendManagementMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(SessionSendManagementMessage.class);
-
- // Attributes ----------------------------------------------------
-
- private ClientMessage clientMessage;
-
- private ServerMessage serverMessage;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionSendManagementMessage(final ClientMessage message)
- {
- super(SESS_MANAGEMENT_SEND);
-
- this.clientMessage = message;
- }
-
- public SessionSendManagementMessage()
- {
- super(SESS_MANAGEMENT_SEND);
- }
-
- // Public --------------------------------------------------------
-
- public ClientMessage getClientMessage()
- {
- return clientMessage;
- }
-
- public ServerMessage getServerMessage()
- {
- return serverMessage;
- }
-
- public void encodeBody(final MessagingBuffer buffer)
- {
- if (clientMessage != null)
- {
- clientMessage.encode(buffer);
- }
- else
- {
- //If we're replicating a buffer to a backup node then we encode the serverMessage not the clientMessage
- serverMessage.encode(buffer);
- }
- }
-
- public void decodeBody(final MessagingBuffer buffer)
- {
- serverMessage = new ServerMessageImpl();
-
- serverMessage.decode(buffer);
-
- serverMessage.getBody().flip();
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -39,7 +39,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -137,8 +136,6 @@
void handleSendProducerMessage(SessionSendMessage packet);
- void handleManagementMessage(SessionSendManagementMessage packet);
-
void handleFailedOver(Packet packet);
void handleClose(Packet packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -635,7 +635,8 @@
channel,
managementService,
this,
- simpleStringIdGenerator);
+ simpleStringIdGenerator,
+ configuration.getManagementAddress());
sessions.put(name, session);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -67,7 +67,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -169,6 +168,8 @@
private final MessagingServer server;
private final SimpleStringIdGenerator simpleStringIdGenerator;
+
+ private final SimpleString managementAddress;
// Constructors ---------------------------------------------------------------------------------
@@ -189,7 +190,8 @@
final Channel channel,
final ManagementService managementService,
final MessagingServer server,
- final SimpleStringIdGenerator simpleStringIdGenerator) throws Exception
+ final SimpleStringIdGenerator simpleStringIdGenerator,
+ final SimpleString managementAddress) throws Exception
{
this.id = id;
@@ -207,7 +209,7 @@
this.postOffice = postOffice;
- pager = postOffice.getPagingManager();
+ this.pager = postOffice.getPagingManager();
this.queueSettingsRepository = queueSettingsRepository;
@@ -231,6 +233,8 @@
this.server = server;
this.simpleStringIdGenerator = simpleStringIdGenerator;
+
+ this.managementAddress = managementAddress;
}
// ServerSession implementation ----------------------------------------------------------------------------
@@ -2292,7 +2296,10 @@
}
public void handleSendProducerMessage(final SessionSendMessage packet)
- {
+ {
+ //With a send we must make sure it is replicated to backup before being processed on live
+ //or can end up with delivery being processed on backup before original send
+
ServerMessage msg = packet.getServerMessage();
final SendLock lock;
@@ -2341,14 +2348,22 @@
private void doSend(final SessionSendMessage packet)
{
- //With a send we must make sure it is replicated to backup before being processed on live
- //or can end up with delivery being processed on backup before original send
-
Packet response = null;
try
{
- producers.get(packet.getProducerID()).send(packet.getServerMessage());
+ ServerMessage message = packet.getServerMessage();
+
+ if (message.getDestination().equals(managementAddress))
+ {
+ //It's a management message
+
+ doHandleManagementMessage(message);
+ }
+ else
+ {
+ producers.get(packet.getProducerID()).send(message);
+ }
if (packet.isRequiresResponse())
{
@@ -2380,86 +2395,41 @@
}
}
-
- public void handleManagementMessage(final SessionSendManagementMessage packet)
- {
- ServerMessage msg = packet.getServerMessage();
-
- if (msg.getMessageID() == 0L)
- {
- // must generate message id here, so we know they are in sync on live and backup
- long id = storageManager.generateUniqueID();
-
- msg.setMessageID(id);
- }
-
- DelayedResult result = channel.replicatePacket(packet);
-
- //With a send we must make sure it is replicated to backup before being processed on live
- //or can end up with delivery being processed on backup before original send
-
- if (result == null)
- {
- doHandleManagementMessage(packet);
- }
- else
- {
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleManagementMessage(packet);
- }
- });
- }
- }
-
- public void doHandleManagementMessage(final SessionSendManagementMessage packet)
+ private void doHandleManagementMessage(final ServerMessage message) throws Exception
{
- try
+ if (message.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
{
- ServerMessage message = packet.getServerMessage();
+ boolean subscribe = (Boolean)message.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
- if (message.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
- {
- boolean subscribe = (Boolean)message.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
+ final SimpleString replyTo = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
- final SimpleString replyTo = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
-
- if (subscribe)
+ if (subscribe)
+ {
+ if (log.isDebugEnabled())
{
- if (log.isDebugEnabled())
- {
- log.debug("added notification listener " + this);
- }
-
- managementService.addNotificationListener(this, null, replyTo);
+ log.debug("added notification listener " + this);
}
- else
- {
- if (log.isDebugEnabled())
- {
- log.debug("removed notification listener " + this);
- }
- managementService.removeNotificationListener(this);
- }
+ managementService.addNotificationListener(this, null, replyTo);
}
else
{
- managementService.handleMessage(message);
-
- message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
+ if (log.isDebugEnabled())
+ {
+ log.debug("removed notification listener " + this);
+ }
- send(message);
+ managementService.removeNotificationListener(this);
}
}
- catch (Exception e)
+ else
{
- log.error("Failed to send management message", e);
+ managementService.handleMessage(message);
+
+ message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
+
+ send(message);
}
-
- channel.confirm(packet);
}
public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -26,7 +26,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
@@ -66,7 +65,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -299,12 +297,6 @@
session.handleSendProducerMessage(message);
break;
}
- case SESS_MANAGEMENT_SEND:
- {
- SessionSendManagementMessage message = (SessionSendManagementMessage)packet;
- session.handleManagementMessage(message);
- break;
- }
case SESS_REPLICATE_DELIVERY:
{
SessionReplicateDeliveryMessage message = (SessionReplicateDeliveryMessage)packet;
Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/config/ConfigurationTest-config.xml 2008-11-10 15:30:53 UTC (rev 5326)
@@ -13,6 +13,7 @@
<connection-scan-period>6543</connection-scan-period>
<transaction-timeout>98765</transaction-timeout>
<transaction-timeout-scan-period>56789</transaction-timeout-scan-period>
+ <management-address>Giraffe</management-address>
<remoting-interceptors>
<class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor2</class-name>
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -22,6 +22,8 @@
package org.jboss.messaging.tests.integration.cluster;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+
import java.util.HashMap;
import java.util.Map;
@@ -122,7 +124,7 @@
managementMessage.getBody().flip();
- session1.sendManagementMessage(managementMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
}
ClientConsumer consumer1 = session1.createConsumer(replyTo);
@@ -143,7 +145,7 @@
managementMessage.getBody().flip();
- session1.sendManagementMessage(managementMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
}
session1.start();
@@ -217,7 +219,7 @@
managementMessage.getBody().flip();
- session1.sendManagementMessage(managementMessage);
+ producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
}
ClientConsumer consumer1 = session1.createConsumer(replyTo);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -31,13 +31,13 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.List;
import junit.framework.TestCase;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -73,6 +73,7 @@
assertEquals(ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED, conf.isWildcardRoutingEnabled());
assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT, conf.getTransactionTimeout());
assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD, conf.getTransactionTimeoutScanPeriod());
+ assertEquals(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS, conf.getManagementAddress());
}
public void testSetGetAttributes()
@@ -150,7 +151,11 @@
i = randomInt();
conf.setJournalMaxAIO(i);
- assertEquals(i, conf.getJournalMaxAIO());
+ assertEquals(i, conf.getJournalMaxAIO());
+
+ s = randomString();
+ conf.setManagementAddress(new SimpleString(s));
+ assertEquals(s, conf.getManagementAddress().toString());
}
}
@@ -224,6 +229,9 @@
i = randomInt();
conf.setJournalMaxAIO(i);
+
+ s = randomString();
+ conf.setManagementAddress(new SimpleString(s));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-11-10 15:30:53 UTC (rev 5326)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.FileConfiguration;
+import org.jboss.messaging.util.SimpleString;
/**
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -58,7 +59,7 @@
assertEquals(true, conf.isWildcardRoutingEnabled());
assertEquals(98765, conf.getTransactionTimeout());
assertEquals(56789, conf.getTransactionTimeoutScanPeriod());
-
+ assertEquals(new SimpleString("Giraffe"), conf.getManagementAddress());
assertEquals(2, conf.getInterceptorClassNames().size());
assertTrue(conf.getInterceptorClassNames().contains("org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1"));
assertTrue(conf.getInterceptorClassNames().contains("org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor2"));
More information about the jboss-cvs-commits
mailing list