[jboss-cvs] JBoss Messaging SVN: r5326 - in trunk: src/main/org/jboss/messaging/core/client and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 10 10:30:53 EST 2008


Author: timfox
Date: 2008-11-10 10:30:53 -0500 (Mon, 10 Nov 2008)
New Revision: 5326

Removed:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
Modified:
   trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/config/ConfigurationTest-config.xml
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
Simplified send management message


Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -21,6 +21,8 @@
  */
 package org.jboss.messaging.example;
 
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -57,9 +59,10 @@
 
       // create a management message to subscribe to notifications from the
       // server
+      ClientProducer producer = clientSession.createProducer(DEFAULT_MANAGEMENT_ADDRESS);
       ClientMessage mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, true);
-      clientSession.sendManagementMessage(mngmntMessage);
+      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
       System.out.println("send message to subscribe to notifications");
 
       ClientConsumer mngmntConsumer = clientSession.createConsumer(replytoQueue);
@@ -117,7 +120,7 @@
                                               ManagementServiceImpl.getMessagingServerObjectName(),
                                               "setMessageCounterSamplePeriod",
                                               (long)30000);
-      clientSession.sendManagementMessage(mngmntMessage);
+      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
       System.out.println("sent management message to set an attribute");
 
       // create a message to retrieve one or many attributes
@@ -128,7 +131,7 @@
                                      "MessageCount",
                                      "Durable");
 
-      clientSession.sendManagementMessage(mngmntMessage);
+      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
       System.out.println("sent management message to retrieve attributes");
 
       // create a message to invoke the operation sendMessageToDLQ(long) on the
@@ -139,14 +142,14 @@
                                               ManagementServiceImpl.getQueueObjectName(queue, queue),
                                               "sendMessageToDLQ",
                                               (long)6161);
-      clientSession.sendManagementMessage(mngmntMessage);
+      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
       System.out.println("sent management message to invoke operation");
 
       // create a message to unsubscribe from the notifications sent by the
       // server
       mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, false);
-      clientSession.sendManagementMessage(mngmntMessage);
+      producer.send(DEFAULT_MANAGEMENT_ADDRESS, mngmntMessage);
       System.out.println("send message to unsubscribe to notifications");
 
       Thread.sleep(5000);

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -116,6 +116,4 @@
    boolean removeFailureListener(FailureListener listener);
 
    int getVersion();
-   
-   void sendManagementMessage(ClientMessage message);
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -63,7 +63,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
@@ -745,16 +744,6 @@
       // Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
       remotingConnection.addFailureListener(this);
    }
-   
-   public void sendManagementMessage(final ClientMessage message)
-   {
-      Packet packet = new SessionSendManagementMessage(message);
-      
-      //Fill in the dest - it's not actually used
-      message.setDestination(new SimpleString("JBM"));
-      
-      channel.send(packet);
-   }
 
    // XAResource implementation
    // --------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -63,8 +63,6 @@
 
    public static final SimpleString HDR_JMX_NOTIFICATION = new SimpleString("JBMJMXNotification");
 
-   public static final SimpleString MANAGEMENT_DESTINATION = new SimpleString("admin.AdminDestination");
-
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -27,6 +27,7 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -87,6 +88,10 @@
    TransportConfiguration getBackupConnectorConfiguration();
 
    void setBackupConnectorConfiguration(TransportConfiguration config);
+   
+   SimpleString getManagementAddress();
+   
+   void setManagementAddress(SimpleString address);
 
    // Journal related attributes
    // ------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -20,6 +20,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
@@ -80,6 +81,8 @@
    public static final long DEFAULT_TRANSACTION_TIMEOUT = 60000;
 
    public static final long DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD = 1000;
+   
+   public static final SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("admin.management");
 
    // Attributes -----------------------------------------------------------------------------
 
@@ -144,6 +147,8 @@
    protected long transactionTimeout = DEFAULT_TRANSACTION_TIMEOUT;
 
    protected long transactionTimeoutScanPeriod = DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
+   
+   protected SimpleString managementAddress = DEFAULT_MANAGEMENT_ADDRESS;
 
    public boolean isClustered()
    {
@@ -424,6 +429,16 @@
    {
       return messageCounterEnabled;
    }
+   
+   public SimpleString getManagementAddress()
+   {
+      return managementAddress;
+   }
+   
+   public void setManagementAddress(SimpleString address)
+   {
+      this.managementAddress = address;
+   }
 
    @Override
    public boolean equals(final Object other)
@@ -454,7 +469,8 @@
              cother.getJournalMinFiles() == getJournalMinFiles() &&
              cother.getJournalType() == getJournalType() &&
              cother.getScheduledThreadPoolMaxSize() == getScheduledThreadPoolMaxSize() &&
-             cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval();
+             cother.getSecurityInvalidationInterval() == getSecurityInvalidationInterval() &&
+             cother.getManagementAddress().equals(getManagementAddress());
    }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.XMLUtil;
 import org.w3c.dom.Element;
 import org.w3c.dom.NamedNodeMap;
@@ -91,6 +92,8 @@
       transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout);
 
       transactionTimeoutScanPeriod = getLong(e, "transaction-timeout-scan-period", transactionTimeoutScanPeriod);
+      
+      managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
             
       NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -40,7 +40,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
@@ -125,7 +124,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -788,11 +786,6 @@
             packet = new NullResponseMessage();
             break;
          }
-         case SESS_MANAGEMENT_SEND:
-         {
-            packet = new SessionSendManagementMessage();
-            break;
-         }
          case SESS_REPLICATE_DELIVERY:
          {
             packet = new SessionReplicateDeliveryMessage();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -146,12 +146,10 @@
 
    public static final byte SESS_RECEIVE_MSG = 79;
 
-   public static final byte SESS_MANAGEMENT_SEND = 80;
+   public static final byte SESS_FAILOVER_COMPLETE = 80;
 
-   public static final byte SESS_FAILOVER_COMPLETE = 81;
+   public static final byte SESS_REPLICATE_DELIVERY = 81;
 
-   public static final byte SESS_REPLICATE_DELIVERY = 82;
-
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -1,106 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- * 
- * @version <tt>$Revision$</tt>
- */
-public class SessionSendManagementMessage extends PacketImpl
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(SessionSendManagementMessage.class);
-   
-   // Attributes ----------------------------------------------------
-
-   private ClientMessage clientMessage;
-   
-   private ServerMessage serverMessage;
-   
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public SessionSendManagementMessage(final ClientMessage message)
-   {
-      super(SESS_MANAGEMENT_SEND);
-
-      this.clientMessage = message;
-   }
-      
-   public SessionSendManagementMessage()
-   {
-      super(SESS_MANAGEMENT_SEND);
-   }
-
-   // Public --------------------------------------------------------
-
-   public ClientMessage getClientMessage()
-   {
-      return clientMessage;
-   }
-   
-   public ServerMessage getServerMessage()
-   {
-      return serverMessage;
-   }
-   
-   public void encodeBody(final MessagingBuffer buffer)
-   {
-      if (clientMessage != null)
-      {
-         clientMessage.encode(buffer);
-      }
-      else
-      {
-         //If we're replicating a buffer to a backup node then we encode the serverMessage not the clientMessage
-         serverMessage.encode(buffer);
-      }
-   }
-   
-   public void decodeBody(final MessagingBuffer buffer)
-   {          
-      serverMessage = new ServerMessageImpl();
-      
-      serverMessage.decode(buffer);
-      
-      serverMessage.getBody().flip();
-   }
-
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -39,7 +39,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -137,8 +136,6 @@
 
    void handleSendProducerMessage(SessionSendMessage packet);
 
-   void handleManagementMessage(SessionSendManagementMessage packet);
-
    void handleFailedOver(Packet packet);
    
    void handleClose(Packet packet);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -635,7 +635,8 @@
                                                               channel,
                                                               managementService,
                                                               this,
-                                                              simpleStringIdGenerator);
+                                                              simpleStringIdGenerator,
+                                                              configuration.getManagementAddress());
 
       sessions.put(name, session);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -67,7 +67,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -169,6 +168,8 @@
    private final MessagingServer server;
 
    private final SimpleStringIdGenerator simpleStringIdGenerator;
+   
+   private final SimpleString managementAddress;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -189,7 +190,8 @@
                             final Channel channel,
                             final ManagementService managementService,
                             final MessagingServer server,
-                            final SimpleStringIdGenerator simpleStringIdGenerator) throws Exception
+                            final SimpleStringIdGenerator simpleStringIdGenerator,
+                            final SimpleString managementAddress) throws Exception
    {
       this.id = id;
 
@@ -207,7 +209,7 @@
 
       this.postOffice = postOffice;
 
-      pager = postOffice.getPagingManager();
+      this.pager = postOffice.getPagingManager();
 
       this.queueSettingsRepository = queueSettingsRepository;
 
@@ -231,6 +233,8 @@
       this.server = server;
 
       this.simpleStringIdGenerator = simpleStringIdGenerator;
+      
+      this.managementAddress = managementAddress;
    }
 
    // ServerSession implementation ----------------------------------------------------------------------------
@@ -2292,7 +2296,10 @@
    }
 
    public void handleSendProducerMessage(final SessionSendMessage packet)
-   {                     
+   {         
+      //With a send we must make sure it is replicated to backup before being processed on live
+      //or can end up with delivery being processed on backup before original send
+         
       ServerMessage msg = packet.getServerMessage();
       
       final SendLock lock;
@@ -2341,14 +2348,22 @@
    
    private void doSend(final SessionSendMessage packet)
    {
-      //With a send we must make sure it is replicated to backup before being processed on live
-      //or can end up with delivery being processed on backup before original send
-         
       Packet response = null;
 
       try
       {
-         producers.get(packet.getProducerID()).send(packet.getServerMessage());
+         ServerMessage message = packet.getServerMessage();
+         
+         if (message.getDestination().equals(managementAddress))
+         {
+            //It's a management message
+            
+            doHandleManagementMessage(message);
+         }
+         else
+         {         
+            producers.get(packet.getProducerID()).send(message);
+         }
 
          if (packet.isRequiresResponse())
          {
@@ -2380,86 +2395,41 @@
       }
    }
    
-  
-   public void handleManagementMessage(final SessionSendManagementMessage packet)
-   {        
-      ServerMessage msg = packet.getServerMessage();
-      
-      if (msg.getMessageID() == 0L)
-      {
-         // must generate message id here, so we know they are in sync on live and backup
-         long id = storageManager.generateUniqueID();
-
-         msg.setMessageID(id);
-      }
-      
-      DelayedResult result = channel.replicatePacket(packet);
-      
-      //With a send we must make sure it is replicated to backup before being processed on live
-      //or can end up with delivery being processed on backup before original send
-      
-      if (result == null)
-      {
-         doHandleManagementMessage(packet);                        
-      }
-      else
-      {
-         result.setResultRunner(new Runnable()
-         {
-            public void run()
-            {
-               doHandleManagementMessage(packet);
-            }
-         });
-      }
-   }
-
-   public void doHandleManagementMessage(final SessionSendManagementMessage packet)
+   private void doHandleManagementMessage(final ServerMessage message) throws Exception
    {
-      try
+      if (message.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
       {
-         ServerMessage message = packet.getServerMessage();
+         boolean subscribe = (Boolean)message.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
 
-         if (message.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
-         {
-            boolean subscribe = (Boolean)message.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
+         final SimpleString replyTo = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
 
-            final SimpleString replyTo = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
-
-            if (subscribe)
+         if (subscribe)
+         {
+            if (log.isDebugEnabled())
             {
-               if (log.isDebugEnabled())
-               {
-                  log.debug("added notification listener " + this);
-               }
-
-               managementService.addNotificationListener(this, null, replyTo);
+               log.debug("added notification listener " + this);
             }
-            else
-            {
-               if (log.isDebugEnabled())
-               {
-                  log.debug("removed notification listener " + this);
-               }
 
-               managementService.removeNotificationListener(this);
-            }
+            managementService.addNotificationListener(this, null, replyTo);
          }
          else
          {
-            managementService.handleMessage(message);
-            
-            message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
+            if (log.isDebugEnabled())
+            {
+               log.debug("removed notification listener " + this);
+            }
 
-            send(message);
+            managementService.removeNotificationListener(this);
          }
       }
-      catch (Exception e)
+      else
       {
-         log.error("Failed to send management message", e);        
+         managementService.handleMessage(message);
+         
+         message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
+
+         send(message);
       }
-      
-      channel.confirm(packet);
    }
 
    public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -26,7 +26,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
@@ -66,7 +65,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -299,12 +297,6 @@
                session.handleSendProducerMessage(message);
                break;
             }
-            case SESS_MANAGEMENT_SEND:
-            {
-               SessionSendManagementMessage message = (SessionSendManagementMessage)packet;
-               session.handleManagementMessage(message);
-               break;
-            }
             case SESS_REPLICATE_DELIVERY:
             {
                SessionReplicateDeliveryMessage message = (SessionReplicateDeliveryMessage)packet;

Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/config/ConfigurationTest-config.xml	2008-11-10 15:30:53 UTC (rev 5326)
@@ -13,6 +13,7 @@
       <connection-scan-period>6543</connection-scan-period>
       <transaction-timeout>98765</transaction-timeout>
       <transaction-timeout-scan-period>56789</transaction-timeout-scan-period>
+      <management-address>Giraffe</management-address>
       <remoting-interceptors>
          <class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1</class-name>
          <class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor2</class-name>

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.tests.integration.cluster;
 
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -122,7 +124,7 @@
          
          managementMessage.getBody().flip();
          
-         session1.sendManagementMessage(managementMessage);
+         producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
       }
                             
       ClientConsumer consumer1 = session1.createConsumer(replyTo);
@@ -143,7 +145,7 @@
          
          managementMessage.getBody().flip();
          
-         session1.sendManagementMessage(managementMessage);
+         producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
       }
             
       session1.start();
@@ -217,7 +219,7 @@
          
          managementMessage.getBody().flip();
          
-         session1.sendManagementMessage(managementMessage);
+         producer.send(DEFAULT_MANAGEMENT_ADDRESS, managementMessage);
       }
                             
       ClientConsumer consumer1 = session1.createConsumer(replyTo);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -31,13 +31,13 @@
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.List;
 
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -73,6 +73,7 @@
       assertEquals(ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED, conf.isWildcardRoutingEnabled());
       assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT, conf.getTransactionTimeout());
       assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD, conf.getTransactionTimeoutScanPeriod());
+      assertEquals(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS, conf.getManagementAddress());
    }
    
    public void testSetGetAttributes()
@@ -150,7 +151,11 @@
          
          i = randomInt();
          conf.setJournalMaxAIO(i);
-         assertEquals(i, conf.getJournalMaxAIO());        
+         assertEquals(i, conf.getJournalMaxAIO());  
+         
+         s = randomString();
+         conf.setManagementAddress(new SimpleString(s));
+         assertEquals(s, conf.getManagementAddress().toString());
       }
    }
    
@@ -224,6 +229,9 @@
  
       i = randomInt();
       conf.setJournalMaxAIO(i);
+      
+      s = randomString();
+      conf.setManagementAddress(new SimpleString(s));
   
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       ObjectOutputStream oos = new ObjectOutputStream(baos);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2008-11-10 14:50:54 UTC (rev 5325)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2008-11-10 15:30:53 UTC (rev 5326)
@@ -27,6 +27,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.FileConfiguration;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -58,7 +59,7 @@
       assertEquals(true, conf.isWildcardRoutingEnabled());
       assertEquals(98765, conf.getTransactionTimeout());
       assertEquals(56789, conf.getTransactionTimeoutScanPeriod());
-      
+      assertEquals(new SimpleString("Giraffe"), conf.getManagementAddress());
       assertEquals(2, conf.getInterceptorClassNames().size());
       assertTrue(conf.getInterceptorClassNames().contains("org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1"));
       assertTrue(conf.getInterceptorClassNames().contains("org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor2"));




More information about the jboss-cvs-commits mailing list