[jboss-cvs] JBoss Messaging SVN: r5702 - in trunk: src/main/org/jboss/messaging/core/client/management/impl and 14 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 23 08:28:05 EST 2009


Author: timfox
Date: 2009-01-23 08:28:05 -0500 (Fri, 23 Jan 2009)
New Revision: 5702

Added:
   trunk/src/main/org/jboss/messaging/core/management/Notification.java
   trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java
   trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
   trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/Consumer.java
   trunk/src/main/org/jboss/messaging/core/server/Distributor.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
More clustering work


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -233,7 +233,7 @@
 
       SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
 
-      if (msg.getEncodeSize() > minLargeMessageSize)
+      if (msg.getEncodeSize() >= minLargeMessageSize)
       {
          sendMessageInChunks(sendBlocking, msg);
       }
@@ -255,7 +255,7 @@
    {
       int headerSize = msg.getPropertiesEncodeSize();
 
-      if (headerSize > minLargeMessageSize)
+      if (headerSize >= minLargeMessageSize)
       {
          throw new MessagingException(MessagingException.ILLEGAL_STATE,
                                       "Header size (" + headerSize + ") is too big, use the messageBody for large data, or increase minLargeMessageSize");

Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -48,24 +48,28 @@
 
    // Constants -----------------------------------------------------
 
-   public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("JBMJMXObjectName");
+   public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("JBM_JMX_ObjectName");
 
-   public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("JBMJMXAttribute.");
+   public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("JBM_JMXAttribute.");
 
-   public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("JBMJMXOperation.");
+   public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("JBM_JMXOperation.");
 
    public static final SimpleString HDR_JMX_OPERATION_NAME = new SimpleString(HDR_JMX_OPERATION_PREFIX + "name");
 
-   public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("JBMJMXOperationSucceeded");
+   public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("JBM_JMXOperationSucceeded");
 
-   public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBMJMXOperationException");
+   public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBM_JMXOperationException");
 
-   public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBMNotifType");
+   public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBM_NotifType");
 
-   public static final SimpleString HDR_NOTIFICATION_MESSAGE = new SimpleString("JBMNotifMessage");
+   public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBM_NotifTimestamp");
+   
+   public static final SimpleString HDR_QUEUE_NAME = new SimpleString("JBM_QueueName");
+   
+   public static final SimpleString HDR_ADDRESS = new SimpleString("JBM_Address");
+   
+   public static final SimpleString HDR_FILTERSTRING = new SimpleString("JBM_FilterString");
 
-   public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBMNotifTimestamp");
-
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -33,7 +33,6 @@
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
-import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -44,6 +43,7 @@
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -108,10 +108,8 @@
 
    public Object getResource(ObjectName objectName);
 
-   void handleMessage(Message message);
+   void handleMessage(ServerMessage message);  
 
-   void sendNotification(NotificationType type, String message) throws Exception;
-
    /** 
     * the message corresponding to a notification will always contain the properties:
     * <ul>
@@ -124,7 +122,11 @@
     * 
     * @see ManagementHelper
     */
-   void sendNotification(NotificationType type, String message, TypedProperties props) throws Exception;
+   void sendNotification(Notification notification) throws Exception;
 
    void enableNotifications(boolean enable);
+   
+   void addNotificationListener(NotificationListener listener);
+   
+   void removeNotificationListener(NotificationListener listener);
 }

Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -30,6 +30,7 @@
 import javax.management.openmbean.TabularData;
 
 import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * This interface describes the core management interface exposed by the server
@@ -164,5 +165,7 @@
    String connectionID);
 
    TabularData getConnectors() throws Exception;
+   
+   void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
 
 }

Added: trunk/src/main/org/jboss/messaging/core/management/Notification.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/Notification.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/Notification.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.management;
+
+import org.jboss.messaging.util.TypedProperties;
+
+/**
+ * A Notification
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 22 Jan 2009 16:41:12
+ *
+ *
+ */
+public class Notification
+{
+   private final NotificationType type;
+   
+   private final TypedProperties properties;
+      
+   public Notification(final NotificationType type, final TypedProperties properties)
+   {
+      this.type = type;
+      this.properties = properties;
+   }
+   
+   public NotificationType getType()
+   {
+      return type;
+   }
+
+   public TypedProperties getProperties()
+   {
+      return properties;
+   }
+}

Added: trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.management;
+
+/**
+ * A NotificationListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 22 Jan 2009 16:48:27
+ *
+ *
+ */
+public interface NotificationListener
+{
+   void onNotification(Notification notification);
+}

Modified: trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationType.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationType.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -31,5 +31,5 @@
  */
 public enum NotificationType
 {
-   QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED;
+   QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED, CONSUMER_CREATED, CONSUMER_CLOSED;
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -55,11 +55,12 @@
 import org.jboss.messaging.core.management.DiscoveryGroupControlMBean;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationListener;
 import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareAddressControlWrapper;
 import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareMessagingServerControlWrapper;
 import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareQueueControlWrapper;
-import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.messagecounter.MessageCounter;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -79,11 +80,13 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TypedProperties;
 
 /*
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:fox at redhat.com">Tim Fox</a>
  * 
  * @version <tt>$Revision$</tt>
  */
@@ -122,7 +125,9 @@
    private boolean started = false;
 
    private boolean noticationsEnabled;
-
+      
+   private final Set<NotificationListener> listeners = new ConcurrentHashSet<NotificationListener>();
+   
    // Static --------------------------------------------------------
 
    public static ObjectName getMessagingServerObjectName() throws Exception
@@ -233,19 +238,31 @@
       AddressControl addressControl = new AddressControl(address, postOffice, securityRepository);
 
       registerInJMX(objectName, new ReplicationAwareAddressControlWrapper(objectName, addressControl));
+      
       registerInRegistry(objectName, addressControl);
+      
       if (log.isDebugEnabled())
       {
          log.debug("registered address " + objectName);
       }
-      sendNotification(NotificationType.ADDRESS_ADDED, address.toString());
+      TypedProperties props = new TypedProperties();
+      
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+            
+      sendNotification(new Notification(NotificationType.ADDRESS_ADDED, props));
    }
 
    public void unregisterAddress(final SimpleString address) throws Exception
    {
       ObjectName objectName = getAddressObjectName(address);
+      
       unregisterResource(objectName);
-      sendNotification(NotificationType.ADDRESS_REMOVED, address.toString());
+      
+      TypedProperties props = new TypedProperties();
+      
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+      
+      sendNotification(new Notification(NotificationType.ADDRESS_REMOVED, props));
    }
 
    public void registerQueue(final Queue queue, final SimpleString address, final StorageManager storageManager) throws Exception
@@ -266,7 +283,13 @@
       {
          log.debug("registered queue " + objectName);
       }
-      sendNotification(NotificationType.QUEUE_CREATED, queue.getName().toString());
+      
+      TypedProperties props = new TypedProperties();
+      
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+      props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, queue.getName());
+      
+      sendNotification(new Notification(NotificationType.QUEUE_CREATED, props));
    }
 
    public void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
@@ -274,8 +297,13 @@
       ObjectName objectName = getQueueObjectName(address, name);
       unregisterResource(objectName);
       messageCounterManager.unregisterMessageCounter(name.toString());
+      
+      TypedProperties props = new TypedProperties();
+      
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+      props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, name);
 
-      sendNotification(NotificationType.QUEUE_DESTROYED, name.toString());
+      sendNotification(new Notification(NotificationType.QUEUE_DESTROYED, props));
    }
 
    public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
@@ -334,7 +362,7 @@
       unregisterResource(objectName);
    }
 
-   public void handleMessage(final Message message)
+   public void handleMessage(final ServerMessage message)
    {
       SimpleString objectName = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_OBJECTNAME);
       if (log.isDebugEnabled())
@@ -425,6 +453,16 @@
       registry.put(objectName, managedResource);
    }
 
+   public void addNotificationListener(final NotificationListener listener)
+   {
+      listeners.add(listener);
+   }
+   
+   public void removeNotificationListener(final NotificationListener listener)
+   {
+      listeners.remove(listener);
+   }
+
    // MessagingComponent implementation -----------------------------
 
    public void start() throws Exception
@@ -476,45 +514,53 @@
          }
       }
    }
-
-   public void sendNotification(final NotificationType type, final String message) throws Exception
-   {
-      sendNotification(type, message, null);
-   }
-
-   public void sendNotification(final NotificationType type, final String message, TypedProperties props) throws Exception
-   {
-      // TODO - we need a parameter to determine if the notification is durable or not
+      
+   public void sendNotification(final Notification notification) throws Exception
+   {     
       if (managedServer != null && noticationsEnabled)
       {
-         ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
-         notificationMessage.setDestination(managementNotificationAddress);
-         notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
-
-         TypedProperties notifProps;
-         if (props != null)
+         //This needs to be synchronized since we need to ensure notifications are processed in strict sequence
+         synchronized (this)
          {
-            notifProps = props;
+            //First send to any local listeners
+            for (NotificationListener listener: listeners)
+            {
+               try
+               {
+                  listener.onNotification(notification);
+               }
+               catch (Exception e)
+               {
+                  //Exception thrown from one listener should not stop execution of others
+                  log.error("Failed to call listener", e);
+               }            
+            }
+            
+            //Now send message
+            
+            ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+            notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+            //Notification messages are always durable so the user can choose whether to add a durable queue to consume them in
+            notificationMessage.setDurable(true);
+            notificationMessage.setDestination(managementNotificationAddress);
+               
+            TypedProperties notifProps;
+            if (notification.getProperties() != null)
+            {
+               notifProps = notification.getProperties();
+            }
+            else
+            {
+               notifProps = new TypedProperties();
+            }
+   
+            notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));        
+            notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+            
+            notificationMessage.putTypedProperties(notifProps);
+            
+            postOffice.route(notificationMessage, null);
          }
-         else
-         {
-            notifProps = new TypedProperties();
-         }
-
-         notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
-         notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_MESSAGE, new SimpleString(message));
-         notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-
-         notificationMessage.putTypedProperties(notifProps);
-
-         // List<MessageReference> refs = postOffice.route(notificationMessage);
-         //
-         // for (MessageReference ref : refs)
-         // {
-         // ref.getQueue().add(ref);
-         // }
-
-         postOffice.route(notificationMessage, null);
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -57,7 +57,6 @@
 import org.jboss.messaging.core.postoffice.impl.BindingImpl;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.RemotingService;
-import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
@@ -544,6 +543,11 @@
       Collection<TransportConfiguration> connectorConfigurations = configuration.getConnectorConfigurations().values();
       return TransportConfigurationInfo.toTabularData(connectorConfigurations);
    }
+   
+   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+   {
+      postOffice.sendQueueInfoToQueue(queueName);
+   }
 
    // NotificationEmitter implementation ----------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.core.management.jmx.impl;
 
 import java.util.List;
-import java.util.Map;
 
 import javax.management.MBeanInfo;
 import javax.management.ObjectName;
@@ -33,6 +32,7 @@
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
 import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
 import org.jboss.messaging.core.management.impl.MessagingServerControl;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * A ReplicationAwareMessagingServerControlWrapper
@@ -64,12 +64,11 @@
 
    // MessagingServerControlMBean implementation ------------------------------
 
-   
    public String getBackupConnectorName()
    {
       return localControl.getBackupConnectorName();
    }
-   
+
    public String getBindingsDirectory()
    {
       return localControl.getBindingsDirectory();
@@ -234,12 +233,17 @@
    {
       return localControl.listSessions(connectionID);
    }
-   
+
    public TabularData getConnectors() throws Exception
    {
       return localControl.getConnectors();
    }
-   
+
+   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+   {
+      localControl.sendQueueInfoToQueue(queueName);
+   }
+
    public boolean addAddress(String address) throws Exception
    {
       return (Boolean)replicationAwareInvoke("addAddress", address);

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -55,22 +55,24 @@
 
    private static final Logger log = Logger.getLogger(MessageImpl.class);
 
-   public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("_JBM_ACTUAL_EXPIRY");
+   public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBM_ACTUAL_EXPIRY");
 
-   public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("_JBM_ORIG_DESTINATION");
+   public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("JBM_ORIG_DESTINATION");
 
-   public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_JBM_ORIG_MESSAGE_ID");
+   public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("JBM_ORIG_MESSAGE_ID");
 
-   public static final SimpleString HDR_GROUP_ID = new SimpleString("_JBM_GROUP_ID");
+   public static final SimpleString HDR_GROUP_ID = new SimpleString("JBM_GROUP_ID");
 
-   public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_JBM_SCHED_DELIVERY");
+   public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("JBM_SCHED_DELIVERY");
    
-   public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("_JBM_DUPL_ID");
+   public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("JBM_DUPL_ID");
    
-   public static final SimpleString HDR_MAX_HOPS = new SimpleString("_JBM_MAX_HOPS");
+   public static final SimpleString HDR_MAX_HOPS = new SimpleString("JBM_MAX_HOPS");
    
-   public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("_JBM_ROUTE_TO:");
+   public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("JBM_ROUTE_TO:");
    
+   public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("JBM_RESET_QUEUE_DATA");
+   
    // Attributes ----------------------------------------------------
 
    protected long messageID;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -80,4 +80,6 @@
    SendLock getAddressLock(SimpleString address);
 
    DuplicateIDCache getDuplicateIDCache(SimpleString address);
+   
+   void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
 }

Copied: trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java (from rev 5686, trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A QueueInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 20:55:06
+ *
+ *
+ */
+public class QueueInfo implements Serializable
+{
+   private static final long serialVersionUID = 3451892849198803182L;
+
+   private final SimpleString queueName;
+   
+   private final SimpleString address;
+   
+   private List<SimpleString> filterStrings;
+   
+   private int numberOfConsumers;
+
+   public QueueInfo(final SimpleString queueName, final SimpleString address)
+   {
+      this.queueName = queueName;
+      this.address = address;      
+   }
+
+   public SimpleString getQueueName()
+   {
+      return queueName;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public List<SimpleString> getFilterStrings()
+   {
+      return filterStrings;
+   }
+   
+   public void setFilterStrings(final List<SimpleString> filterStrings)
+   {
+      this.filterStrings = filterStrings;
+   }
+
+   public int getNumberOfConsumers()
+   {
+      return numberOfConsumers;
+   }     
+   
+   public void incrementConsumers()
+   {
+      this.numberOfConsumers++;
+   }
+   
+   public void decrementConsumers()
+   {
+      this.numberOfConsumers--;
+   }
+}

Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.cluster.impl.FlowBindingFilter;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 21 Jan 2009 18:55:22
- *
- *
- */
-public class FlowBinding extends BindingImpl
-{
-   private final FlowBindingFilter filter;
-
-   public FlowBinding(final SimpleString address,
-                      final SimpleString uniqueName,
-                      final SimpleString routingName,
-                      final Bindable bindable,
-                      final FlowBindingFilter filter)
-   {
-      super(address, uniqueName, routingName, bindable, false, false);
-
-      this.filter = filter;
-   }
-
-   public boolean accept(final ServerMessage message) throws Exception
-   {
-      if (filter.match(message))
-      {
-         return bindable.accept(message);
-      }
-      else
-      {
-         return false;
-      }
-   }
-   
-   public FlowBindingFilter getFilter()
-   {
-      return filter;
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.postoffice.impl;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,9 +35,13 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationListener;
+import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagingManager;
@@ -47,12 +52,15 @@
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.DuplicateIDCache;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueInfo;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.SendLock;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.SendLockImpl;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionOperation;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -60,6 +68,7 @@
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
 
 /**
  * A PostOfficeImpl
@@ -68,7 +77,7 @@
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="csuconic at redhat.com">Clebert Suconic</a>
  */
-public class PostOfficeImpl implements PostOffice
+public class PostOfficeImpl implements PostOffice, NotificationListener
 {
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
 
@@ -143,13 +152,15 @@
 
       this.idCacheSize = idCacheSize;
 
-      this.persistIDCache = persistIDCache;
+      this.persistIDCache = persistIDCache;            
    }
 
    // MessagingComponent implementation ---------------------------------------
 
    public void start() throws Exception
    {
+      managementService.addNotificationListener(this);
+      
       if (pagingManager != null)
       {
          pagingManager.setPostOffice(this);
@@ -174,12 +185,13 @@
 
    public void stop() throws Exception
    {
+      managementService.removeNotificationListener(this);
+      
       if (messageExpiryExecutor != null)
       {
          messageExpiryExecutor.shutdown();
       }
 
-
       addressManager.clear();
 
       // Release all the locks
@@ -197,6 +209,168 @@
    {
       return started;
    }
+   
+   // NotificationListener implementation -------------------------------------
+   
+   
+   private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
+   
+   private final Object notificationLock = new Object();
+   
+   public void onNotification(final Notification notification)
+   {
+      synchronized (notificationLock)
+      {
+         NotificationType type = notification.getType();
+         
+         if (type == NotificationType.QUEUE_CREATED)
+         {
+            TypedProperties props = notification.getProperties();
+            
+            SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+            
+            SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
+            
+            QueueInfo info = new QueueInfo(queueName, address);
+            
+            queueInfos.put(queueName, info);
+         }
+         else if (type == NotificationType.QUEUE_DESTROYED)
+         {
+            TypedProperties props = notification.getProperties();
+            
+            SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+                     
+            queueInfos.remove(queueName);  
+         }
+         else if (type == NotificationType.CONSUMER_CREATED)
+         {
+            TypedProperties props = notification.getProperties();
+            
+            SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME); 
+            
+            SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+            
+            QueueInfo info = queueInfos.get(queueName);
+            
+            info.incrementConsumers();
+            
+            if (filterString != null)
+            {
+               List<SimpleString> filterStrings = info.getFilterStrings();
+               
+               if (filterStrings == null)
+               {
+                  filterStrings = new ArrayList<SimpleString>();
+                  
+                  info.setFilterStrings(filterStrings);
+               }
+            }         
+         }
+         else if (type == NotificationType.CONSUMER_CLOSED)
+         {
+            TypedProperties props = notification.getProperties();
+            
+            SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME); 
+            
+            SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+            
+            QueueInfo info = queueInfos.get(queueName);
+            
+            info.decrementConsumers();
+            
+            if (filterString != null)
+            {
+               List<SimpleString> filterStrings = info.getFilterStrings();
+               
+               filterStrings.remove(filterString);
+            }             
+         }
+         else
+         {
+            return;
+         }
+      }
+   }
+   
+   private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
+   {
+      ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+      message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+      
+      message.setDestination(queueName);
+      
+      message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(NotificationType.QUEUE_CREATED.toString()));        
+      message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+      
+      return message;
+   }
+   
+   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+   {
+      //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
+      //that queue infos and notifications are received in a contiguous consistent stream
+      Binding binding = addressManager.getBinding(queueName);
+      
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find queue " + queueName);
+      }
+      
+      Queue queue = (Queue)binding.getBindable();
+      
+      //Need to lock to make sure all queue info and notifications are in the correct order with no gaps
+      synchronized (notificationLock)
+      {
+         //First send a reset message
+         
+         ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID()); 
+         message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+         message.setDestination(queueName);
+         message.putBooleanProperty(MessageImpl.HDR_RESET_QUEUE_DATA, true);
+         
+         queue.accept(message);            
+         queue.route(message, null);
+                  
+         for (QueueInfo info: queueInfos.values())
+         {
+            message = createQueueInfoMessage(NotificationType.QUEUE_CREATED, queueName);
+            
+            message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+            message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+            
+            queue.accept(message);            
+            queue.route(message, null);
+            
+            int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+            
+            for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+            {
+               message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+               
+               message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName()); 
+               
+               queue.accept(message);            
+               queue.route(message, null);
+            }
+            
+            if (info.getFilterStrings() != null)
+            {
+               for (SimpleString filterString: info.getFilterStrings())
+               {
+                  message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+                  
+                  message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+                  message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); 
+                  
+                  queue.accept(message);            
+                  queue.route(message, null);
+               }
+            }           
+         }
+      }
+      
+   }
 
    // PostOffice implementation -----------------------------------------------
 
@@ -285,7 +459,7 @@
    }
 
    public void route(final ServerMessage message, Transaction tx) throws Exception
-   {      
+   {            
       SimpleString address = message.getDestination();
 
       if (checkAllowable)
@@ -360,7 +534,7 @@
       }
 
       Bindings bindings = addressManager.getBindings(address);
-          
+      
       if (bindings != null)
       {
          bindings.route(message, tx);
@@ -442,7 +616,7 @@
 
       return cache;
    }
-
+   
    // Private -----------------------------------------------------------------
    
    private final PageMessageOperation getPageOperation(final Transaction tx)

Modified: trunk/src/main/org/jboss/messaging/core/server/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Consumer.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/Consumer.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,8 +22,10 @@
 
 package org.jboss.messaging.core.server;
 
+import org.jboss.messaging.util.SimpleString;
 
 
+
 /**
  * 
  * A ClientConsumer
@@ -34,4 +36,6 @@
 public interface Consumer
 {
    HandleStatus handle(MessageReference reference) throws Exception;
+   
+   SimpleString getFilterString();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Distributor.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/Distributor.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.server;
 
+import java.util.List;
+
 /**
  * 
  * A Distributor
@@ -40,4 +42,6 @@
    int getConsumerCount();
 
    boolean hasConsumers();
+   
+   List<Consumer> getConsumers();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -59,6 +59,8 @@
    boolean removeConsumer(Consumer consumer) throws Exception;
 
    int getConsumerCount();
+   
+   List<Consumer> getConsumers();
 
    void addLast(MessageReference ref);
 

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 23 Jan 2009 11:58:05
+ *
+ *
+ */
+public interface FlowBinding extends Binding
+{
+   void addConsumer(SimpleString filterString) throws Exception;
+   
+   void removeConsumer(SimpleString filterString) throws Exception;
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,24 +22,36 @@
 
 package org.jboss.messaging.core.server.cluster.impl;
 
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.QueueInfo;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -53,6 +65,8 @@
 import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
+import org.jboss.messaging.util.UUIDGenerator;
 
 /**
  * A BridgeImpl
@@ -114,17 +128,19 @@
    private final boolean useDuplicateDetection;
 
    private volatile boolean active;
-   
+
    private final Pair<TransportConfiguration, TransportConfiguration> connectorPair;
-   
+
    private final long retryInterval;
-   
+
    private final double retryIntervalMultiplier;
-   
+
    private final int maxRetriesBeforeFailover;
-   
+
    private final int maxRetriesAfterFailover;
 
+   private final MessageHandler queueInfoMessageHandler;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -146,7 +162,8 @@
                      final double retryIntervalMultiplier,
                      final int maxRetriesBeforeFailover,
                      final int maxRetriesAfterFailover,
-                     final boolean useDuplicateDetection) throws Exception
+                     final boolean useDuplicateDetection,
+                     final MessageHandler queueInfoMessageHandler) throws Exception
    {
       this.name = name;
 
@@ -176,17 +193,19 @@
       this.transformer = transformer;
 
       this.useDuplicateDetection = useDuplicateDetection;
-      
+
       this.connectorPair = connectorPair;
-      
+
       this.retryInterval = retryInterval;
-      
+
       this.retryIntervalMultiplier = retryIntervalMultiplier;
-      
+
       this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
-      
+
       this.maxRetriesAfterFailover = maxRetriesAfterFailover;
 
+      this.queueInfoMessageHandler = queueInfoMessageHandler;
+
       if (maxBatchTime != -1)
       {
          future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(),
@@ -206,7 +225,7 @@
       {
          return;
       }
-      
+
       executor.execute(new CreateObjectsRunnable());
 
       started = true;
@@ -221,7 +240,7 @@
             createTx();
 
             queue.addConsumer(BridgeImpl.this);
-            
+
             csf = new ClientSessionFactoryImpl(connectorPair.a,
                                                connectorPair.b,
                                                retryInterval,
@@ -235,6 +254,42 @@
 
             session.addFailureListener(BridgeImpl.this);
 
+            if (queueInfoMessageHandler != null)
+            {
+               // Get the queue data
+
+               SimpleString notifQueueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
+
+               SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
+                                                      "'" +
+                                                      NotificationType.QUEUE_CREATED +
+                                                      "'" +
+                                                      "'" +
+                                                      NotificationType.QUEUE_DESTROYED +
+                                                      "'" +
+                                                      "'" +
+                                                      NotificationType.CONSUMER_CREATED +
+                                                      "'" +
+                                                      "'" +
+                                                      NotificationType.CONSUMER_CLOSED +
+                                                      "'");
+               
+               session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
+
+               ClientConsumer notifConsumer = session.createConsumer(notifQueueName);
+
+               notifConsumer.setMessageHandler(queueInfoMessageHandler);
+
+               session.start();
+
+               ClientMessage message = session.createClientMessage(false);
+
+               ManagementHelper.putOperationInvocation(message,
+                                                       ManagementServiceImpl.getMessagingServerObjectName(),
+                                                       "sendQueueInfoToQueue",
+                                                       notifQueueName);
+            }
+
             active = true;
 
             queue.deliverAsync(executor);
@@ -244,7 +299,7 @@
             log.warn("Unable to connect. Bridge is now disabled.", e);
 
             active = false;
-            
+
             started = false;
          }
       }
@@ -275,7 +330,7 @@
       {
          log.warn("Timed out waiting for batch to be sent");
       }
-      
+
       csf.close();
    }
 
@@ -348,27 +403,27 @@
 
       return true;
    }
-   
+
    private void fail()
    {
       if (!started)
       {
          return;
       }
-      
+
       log.warn("Bridge connection to target failed. Will try to reconnect");
-            
+
       try
       {
          tx.rollback();
-         
+
          stop();
       }
       catch (Exception e)
       {
          log.error("Failed to stop", e);
       }
-      
+
       executor.execute(new CreateObjectsRunnable());
    }
 
@@ -433,7 +488,7 @@
             }
             else
             {
-               //Preserve the original address
+               // Preserve the original address
                dest = message.getDestination();
             }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,7 +22,8 @@
 
 package org.jboss.messaging.core.server.cluster.impl;
 
-import java.util.ArrayList;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_RESET_QUEUE_DATA;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,28 +32,29 @@
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.impl.BindingImpl;
-import org.jboss.messaging.core.postoffice.impl.FlowBinding;
-import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
-import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.server.cluster.FlowBinding;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.ExecutorFactory;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
 
 /**
  * 
@@ -85,9 +87,9 @@
    private final boolean useDuplicateDetection;
 
    private final int maxHops;
+   
+   private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
 
-   private Map<Pair<TransportConfiguration, TransportConfiguration>, Bridge> bridges = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Bridge>();
-
    private final DiscoveryGroup discoveryGroup;
 
    private final ScheduledExecutorService scheduledExecutor;
@@ -158,7 +160,7 @@
       this.name = name;
 
       this.address = address;
-      
+
       this.bridgeConfig = bridgeConfig;
 
       this.executorFactory = executorFactory;
@@ -170,7 +172,7 @@
       this.queueSettingsRepository = queueSettingsRepository;
 
       this.scheduledExecutor = scheduledExecutor;
-      
+
       this.queueFactory = queueFactory;
 
       this.discoveryGroup = discoveryGroup;
@@ -209,9 +211,9 @@
          discoveryGroup.unregisterListener(this);
       }
 
-      for (Bridge bridge : bridges.values())
+      for (MessageFlowRecord record : records.values())
       {
-         bridge.stop();
+         record.close();
       }
 
       started = false;
@@ -249,18 +251,19 @@
 
       connectorSet.addAll(connectors);
 
-      Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge>> iter = bridges.entrySet()
-                                                                                                      .iterator();
+      Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
+                                                                                                                 .iterator();
 
       while (iter.hasNext())
       {
-         Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge> entry = iter.next();
+         Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
 
          if (!connectorSet.contains(entry.getKey()))
          {
-            // Connector no longer there - we should remove and close it - we don't delete the queue though - it may have messages - this is up to the admininstrator to do this
+            // Connector no longer there - we should remove and close it - we don't delete the queue though - it may
+            // have messages - this is up to the admininstrator to do this
 
-            entry.getValue().stop();
+            entry.getValue().close();
 
             iter.remove();
          }
@@ -268,14 +271,9 @@
 
       for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
       {
-         if (!bridges.containsKey(connectorPair))
+         if (!records.containsKey(connectorPair))
          {
-            SimpleString queueName = new SimpleString("cluster." + name +
-                                                      "." +
-                                                      generateConnectorString(connectorPair.a) +
-                                                      "-" +
-                                                      (connectorPair.b == null ? "null"
-                                                                              : generateConnectorString(connectorPair.b)));
+            SimpleString queueName = generateQueueName(name, connectorPair);
 
             Binding queueBinding = postOffice.getBinding(queueName);
 
@@ -289,12 +287,20 @@
             {
                queue = queueFactory.createQueue(-1, name, null, true, false);
 
-               // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never actually routed to at that address though
+               // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
+               // actually routed to at that address though
 
-               Binding storeBinding = new BindingImpl(queue.getName(), queue.getName(), queue.getName(), queue, false, true);
+               Binding storeBinding = new BindingImpl(queue.getName(),
+                                                      queue.getName(),
+                                                      queue.getName(),
+                                                      queue,
+                                                      false,
+                                                      true);
 
                storageManager.addQueueBinding(storeBinding);
             }
+            
+            MessageFlowRecord record = new MessageFlowRecord(queue);
 
             Bridge bridge = new BridgeImpl(queueName,
                                            queue,
@@ -311,59 +317,28 @@
                                            bridgeConfig.getRetryIntervalMultiplier(),
                                            bridgeConfig.getMaxRetriesBeforeFailover(),
                                            bridgeConfig.getMaxRetriesAfterFailover(),
-                                           false);
+                                           false,
+                                           record);
+            
+            record.setBridge(bridge);
 
-            bridges.put(connectorPair, bridge);
+            records.put(connectorPair, record);
 
             bridge.start();
          }
       }
    }
-       
-   private void updateQueueInfo(final Pair<TransportConfiguration, TransportConfiguration> connectorPair, final QueueInfo info) throws Exception
+
+   private SimpleString generateQueueName(final SimpleString clusterName,
+                                          final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
    {
-      Bridge bridge = this.bridges.get(connectorPair);
-      
-      if (bridge == null)
-      {
-         throw new IllegalArgumentException("Cannot find bridge for " + connectorPair);
-      }
-      
-      SimpleString uniqueName = null;  // ?????
-      
-      FlowBinding flowBinding = (FlowBinding)postOffice.getBinding(uniqueName);
-      
-      if (flowBinding == null)
-      {
-         //TODO - can be optimised by storing the queue with the bridge in this class
-         Binding binding = postOffice.getBinding(bridge.getName());
-         
-         if (binding == null)
-         {
-            throw new IllegalStateException("Cannot find queue with name " + bridge.getName());
-         }
-         
-         Queue queue = (Queue)binding.getBindable();
-           
-         FlowBindingFilter filter = new FlowBindingFilter(info);
-         
-         flowBinding = new FlowBinding(new SimpleString(info.getAddress()), uniqueName, new SimpleString(info.getQueueName()), queue, filter);
-         
-         postOffice.addBinding(flowBinding);
-      }
-      else
-      {
-         FlowBindingFilter filter = flowBinding.getFilter();
-         
-         filter.updateInfo(info);
-      }
+      return new SimpleString("cluster." + name +
+                              "." +
+                              generateConnectorString(connectorPair.a) +
+                              "-" +
+                              (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
    }
-   
-   private void removeQueueInfo()
-   {
-      //TODO
-   }
-   
+
    private String replaceWildcardChars(final String str)
    {
       return str.replace('.', '-');
@@ -397,5 +372,115 @@
 
       return new SimpleString(str.toString());
    }
+   
+   // Inner classes -----------------------------------------------------------------------------------
+   
+   private class MessageFlowRecord implements MessageHandler
+   {
+      private Bridge bridge;
 
+      private final Queue queue;
+
+      private final Map<SimpleString, FlowBinding> bindings = new HashMap<SimpleString, FlowBinding>();
+      
+      private boolean firstReset = false;
+
+      public MessageFlowRecord(final Queue queue)
+      {
+         this.queue = queue;
+      }
+
+      public void close() throws Exception
+      {
+         bridge.stop();
+
+         for (FlowBinding binding : bindings.values())
+         {
+            postOffice.removeBinding(binding.getUniqueName());
+         }
+      }
+      
+      public void setBridge(final Bridge bridge)
+      {
+         this.bridge = bridge;
+      }
+
+      public void onMessage(final ClientMessage message)
+      {
+         try
+         {
+            // Reset the bindings
+            if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
+            {
+               for (FlowBinding binding : bindings.values())
+               {
+                  postOffice.removeBinding(binding.getUniqueName());
+               }
+   
+               bindings.clear();
+               
+               firstReset = true;
+            }
+            
+            if (!firstReset)
+            {
+               return;
+            }
+   
+            NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
+                                                                    .toString());
+   
+            if (type == NotificationType.QUEUE_CREATED)
+            {
+               SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
+                                                                                       .generateSimpleStringUUID());
+   
+               SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
+   
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+   
+               FlowBinding binding = new FlowBindingImpl(queueAddress, uniqueName, queueName, queue, useDuplicateDetection);
+   
+               bindings.put(queueName, binding);
+   
+               postOffice.addBinding(binding);
+            }
+            else if (type == NotificationType.QUEUE_DESTROYED)
+            {
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+   
+               FlowBinding binding = bindings.remove(queueName);
+   
+               postOffice.removeBinding(binding.getUniqueName());
+            }
+            else if (type == NotificationType.CONSUMER_CREATED)
+            {
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+   
+               SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+   
+               FlowBinding binding = bindings.get(queueName);
+   
+               binding.addConsumer(filterString);
+            }
+            else if (type == NotificationType.CONSUMER_CLOSED)
+            {
+               SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+   
+               SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+   
+               FlowBinding binding = bindings.get(queueName);
+   
+               binding.removeConsumer(filterString);
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to handle message", e);
+         }
+      }
+
+   }
+
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -373,7 +373,8 @@
                                  config.getRetryIntervalMultiplier(),
                                  config.getMaxRetriesBeforeFailover(),
                                  config.getMaxRetriesAfterFailover(),
-                                 config.isUseDuplicateDetection());
+                                 config.isUseDuplicateDetection(),
+                                 null);
 
          log.info("put bridge " + this);
          bridges.put(config.getName(), bridge);

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -1,122 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBindingFilter
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 21 Jan 2009 20:53:20
- *
- *
- */
-public class FlowBindingFilter implements Filter
-{
-   private volatile QueueInfo info;
-   
-   private final Map<String, Filter> filters = new HashMap<String, Filter>();
-   
-   public FlowBindingFilter(final QueueInfo info) throws Exception
-   {
-      this.info = info;
-      
-      updateInfo(info);
-   }
-   
-   public void updateInfo(final QueueInfo info) throws Exception
-   {
-      this.info = info;
-      
-      List<String> filterStrings = info.getFilterStrings();
-      
-      for (String filterString: filterStrings)
-      {
-         Filter filter = filters.get(filterString);
-         
-         if (filter == null)
-         {
-            filter = new FilterImpl(new SimpleString(filterString));
-            
-            filters.put(filterString, filter);
-         }
-      }
-      
-      //TODO - this can be optimised by storing map of filters in QueueInfo
-      if (filterStrings.size() < filters.size())
-      {
-         Iterator<String> iter = filters.keySet().iterator();
-         
-         while (iter.hasNext())
-         {
-            String filterString = iter.next();
-            
-            if (!filterStrings.contains(filterString))
-            {
-               iter.remove();
-            }
-         }
-      }
-   }
-         
-   public SimpleString getFilterString()
-   {         
-      return null;
-   }
-
-   public boolean match(final ServerMessage message)
-   {
-      if (info.getNumberOfConsumers() == 0)
-      {
-         return false;
-      }
-      
-      if (filters.isEmpty())
-      {
-         return true;
-      }
-      else
-      {         
-         for (Filter filter: filters.values())
-         {
-            if (filter.match(message))
-            {
-               return true;
-            }
-         }
-         return false;
-      }
-   }
-   
-}

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.FlowBinding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 21 Jan 2009 18:55:22
+ *
+ *
+ */
+public class FlowBindingImpl extends BindingImpl implements FlowBinding
+{
+   private final Set<Filter> filters = new HashSet<Filter>();
+
+   private final Map<SimpleString, Integer> filterCounts = new HashMap<SimpleString, Integer>();
+
+   private int consumerCount;
+   
+   private final boolean duplicateDetection;
+
+   public FlowBindingImpl(final SimpleString address,
+                          final SimpleString uniqueName,
+                          final SimpleString routingName,
+                          final Bindable bindable,
+                          final boolean duplicateDetection)
+   {
+      super(address, uniqueName, routingName, bindable, false, false);
+      
+      this.duplicateDetection = duplicateDetection;
+   }
+
+   public boolean accept(final ServerMessage message) throws Exception
+   {
+      if (consumerCount == 0)
+      {
+         return false;
+      }
+
+      boolean accepted = false;
+      
+      if (filters.isEmpty())
+      {
+         accepted = true;
+      }
+      else
+      {
+         for (Filter filter : filters)
+         {
+            if (filter.match(message))
+            {
+               accepted = true;
+               
+               break;
+            }
+         }         
+      }
+      
+      if (duplicateDetection && accepted)
+      {
+         if (!message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
+         {
+            //Add the message id as a duplicate id header - this will be detected when routing on the remote node - 
+            //any duplicates will be rejected
+            byte[] bytes = new byte[8];
+            
+            ByteBuffer buff = ByteBuffer.wrap(bytes);
+            
+            buff.putLong(message.getMessageID());
+            
+            SimpleString dupID = new SimpleString(bytes);
+            
+            message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);                       
+         }
+      }
+      
+      return accepted;
+   }
+
+   public synchronized void addConsumer(final SimpleString filterString) throws Exception
+   {
+      if (filterString != null)
+      {
+         // There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref
+         // count
+
+         Integer i = filterCounts.get(filterString);
+
+         if (i == null)
+         {
+            filterCounts.put(filterString, 0);
+
+            filters.add(new FilterImpl(filterString));
+         }
+         else
+         {
+            filterCounts.put(filterString, i + 1);
+         }
+      }
+
+      consumerCount++;
+   }
+
+   public synchronized void removeConsumer(final SimpleString filterString) throws Exception
+   {
+      if (filterString != null)
+      {
+         Integer i = filterCounts.get(filterString);
+
+         if (i != null)
+         {
+            int ii = i - 1;
+
+            if (ii == 0)
+            {
+               filterCounts.remove(filterString);
+
+               filters.remove(filterString);
+            }
+            else
+            {
+               filterCounts.put(filterString, ii);
+            }
+         }
+      }
+
+      consumerCount--;
+   }
+
+}

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -1,74 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import java.util.List;
-
-/**
- * A QueueInfo
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 21 Jan 2009 20:55:06
- *
- *
- */
-public class QueueInfo
-{
-   private final String queueName;
-   
-   private final String address;
-   
-   private final List<String> filterStrings;
-   
-   private final int numberOfConsumers;
-
-   public QueueInfo(final String queueName, final String address, final List<String> filterStrings, final int numberOfConsumers)
-   {
-      this.queueName = queueName;
-      this.address = address;
-      this.filterStrings = filterStrings;
-      this.numberOfConsumers = numberOfConsumers;
-   }
-
-   public String getQueueName()
-   {
-      return queueName;
-   }
-
-   public String getAddress()
-   {
-      return address;
-   }
-
-   public List<String> getFilterStrings()
-   {
-      return filterStrings;
-   }
-
-   public int getNumberOfConsumers()
-   {
-      return numberOfConsumers;
-   }            
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -53,4 +53,9 @@
    {
       return !consumers.isEmpty();
    }
+   
+   public List<Consumer> getConsumers()
+   {
+      return consumers;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -393,6 +393,11 @@
    {
       return distributionPolicy.getConsumerCount();
    }
+   
+   public synchronized List<Consumer> getConsumers()
+   {
+      return new ArrayList<Consumer>(distributionPolicy.getConsumers());
+   }
 
    public synchronized List<MessageReference> list(final Filter filter)
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -31,9 +31,13 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -57,6 +61,7 @@
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
 
 /**
  * Concrete implementation of a ClientConsumer.
@@ -127,6 +132,8 @@
    private volatile boolean closed;
 
    private final boolean preAcknowledge;
+   
+   private final ManagementService managementService;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -141,7 +148,8 @@
                              final PagingManager pagingManager,
                              final Channel channel,
                              final boolean preAcknowledge,
-                             final Executor executor)
+                             final Executor executor,
+                             final ManagementService managementService)
    {
       this.id = id;
 
@@ -166,6 +174,8 @@
       this.preAcknowledge = preAcknowledge;
 
       this.pagingManager = pagingManager;
+      
+      this.managementService = managementService;
 
       messageQueue.addConsumer(this);
 
@@ -184,6 +194,18 @@
    {
       return doHandle(ref);
    }
+   
+   public SimpleString getFilterString()
+   {
+      if (filter != null)
+      {
+         return filter.getFilterString();
+      }
+      else
+      {
+         return null;
+      }
+   }
 
    public void handleClose(final Packet packet)
    {
@@ -272,6 +294,17 @@
       }
 
       tx.rollback();
+      
+      if (!browseOnly)
+      {
+         TypedProperties props = new TypedProperties();
+         
+         props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, messageQueue.getName());
+         
+         Notification notification = new Notification(NotificationType.CONSUMER_CLOSED, props);
+         
+         managementService.sendNotification(notification);
+      }
    }
 
    public LinkedList<MessageReference> cancelRefs() throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -95,14 +95,7 @@
    public MessageReference createReference(final Queue queue)
    {
       MessageReference ref = new MessageReferenceImpl(this, queue);
-//
-//      if (durable && queue.isDurable())
-//      {
-//         durableRefCount.incrementAndGet();
-//      }
-//      
-//      refCount.incrementAndGet();
-    
+
       return ref;
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -12,6 +12,8 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -26,11 +28,13 @@
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.client.impl.ClientMessageImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
@@ -94,6 +98,7 @@
 import org.jboss.messaging.util.IDGenerator;
 import org.jboss.messaging.util.SimpleIDGenerator;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
 
 /*
  * Session implementation 
@@ -1392,7 +1397,7 @@
             }
          }
          else
-         {
+         {            
             theQueue = (Queue)binding.getBindable();
          }
 
@@ -1407,9 +1412,26 @@
                                                           postOffice.getPagingManager(),
                                                           channel,
                                                           preAcknowledge,
-                                                          executor);
+                                                          executor,
+                                                          managementService);
 
          consumers.put(consumer.getID(), consumer);
+         
+         if (!browseOnly)
+         {
+            TypedProperties props = new TypedProperties();
+            
+            props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, name);
+            
+            if (filterString != null)
+            {
+               props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+            }
+            
+            Notification notification = new Notification(CONSUMER_CREATED, props);
+            
+            managementService.sendNotification(notification);
+         }
 
          response = new NullResponseMessage();
       }
@@ -1478,7 +1500,7 @@
          }
 
          postOffice.addBinding(binding);
-
+         
          if (temporary)
          {
             // Temporary queue in core simply means the queue will be deleted if

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -125,7 +125,7 @@
 
       // create a queue to receive the management notifications only concerning the destination
       SimpleString notifQueue = randomSimpleString();
-      SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_MESSAGE + " LIKE '%" + destinationName + "%'" );
+      SimpleString filter = new SimpleString(ManagementHelper.HDR_ADDRESS + " LIKE '%" + destinationName + "%'" );
       System.out.println(filter);
       session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, filter, false, true);
       ClientConsumer notifConsumer = session.createConsumer(notifQueue);

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -39,7 +39,6 @@
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.SimpleString;
 
-
 /**
  *
  * A FakePostOffice
@@ -54,12 +53,12 @@
    private ConcurrentHashSet<SimpleString> addresses = new ConcurrentHashSet<SimpleString>();
 
    private volatile boolean started;
-   
+
    public void addBinding(Binding binding) throws Exception
    {
       bindings.put(binding.getAddress(), binding);
    }
- 
+
    public void route(ServerMessage message, Transaction tx) throws Exception
    {
    }
@@ -148,4 +147,9 @@
       return null;
    }
 
+   public void sendQueueInfoToQueue(SimpleString queueName) throws Exception
+   {
+   }
+
+   
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -589,6 +589,11 @@
 
       class NullConsumer implements Consumer
       {
+         public SimpleString getFilterString()
+         {
+            return null;
+         }
+
          public HandleStatus handle(MessageReference reference)
          {
             return null;
@@ -1399,6 +1404,11 @@
 
    class DummyDistributionPolicy implements Distributor
    {
+      public List<Consumer> getConsumers()
+      {         
+         return null;
+      }
+
       Consumer consumer;
       public Consumer select(ServerMessage message, boolean redeliver)
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java	2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java	2009-01-23 13:28:05 UTC (rev 5702)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.tests.unit.core.server.impl.fakes;
 
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -40,69 +41,75 @@
 public class FakeConsumer implements Consumer
 {
    private HandleStatus statusToReturn = HandleStatus.HANDLED;
-   
+
    private HandleStatus newStatus;
-   
+
    private int delayCountdown = 0;
-   
+
    private LinkedList<MessageReference> references = new LinkedList<MessageReference>();
-   
+
    private Filter filter;
-   
+
    public FakeConsumer()
-   {         
+   {
    }
-   
+
    public FakeConsumer(Filter filter)
    {
       this.filter = filter;
    }
-   
+
+   public SimpleString getFilterString()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
    public synchronized MessageReference waitForNextReference(long timeout)
    {
       while (references.isEmpty() && timeout > 0)
       {
-         long start = System.currentTimeMillis(); 
+         long start = System.currentTimeMillis();
          try
          {
             wait();
          }
          catch (InterruptedException e)
-         {                 
+         {
          }
          timeout -= (System.currentTimeMillis() - start);
       }
-      
+
       if (timeout <= 0)
       {
          throw new IllegalStateException("Timed out waiting for reference");
       }
-      
+
       return references.removeFirst();
    }
-   
+
    public synchronized void setStatusImmediate(HandleStatus newStatus)
    {
       this.statusToReturn = newStatus;
    }
-   
+
    public synchronized void setStatusDelayed(HandleStatus newStatus, int numReferences)
    {
       this.newStatus = newStatus;
-      
+
       this.delayCountdown = numReferences;
    }
-   
+
    public synchronized List<MessageReference> getReferences()
    {
       return references;
    }
-   
+
    public synchronized void clearReferences()
    {
       this.references.clear();
    }
-   
+
    public synchronized HandleStatus handle(MessageReference reference)
    {
       if (filter != null)
@@ -112,7 +119,7 @@
             references.addLast(reference);
             reference.getQueue().referenceHandled();
             notify();
-            
+
             return HandleStatus.HANDLED;
          }
          else
@@ -120,28 +127,28 @@
             return HandleStatus.NO_MATCH;
          }
       }
-      
+
       if (newStatus != null)
-      {           
+      {
          if (delayCountdown == 0)
          {
             statusToReturn = newStatus;
-            
+
             newStatus = null;
          }
          else
-         {            
+         {
             delayCountdown--;
          }
       }
-      
+
       if (statusToReturn == HandleStatus.HANDLED)
       {
          reference.getQueue().referenceHandled();
          references.addLast(reference);
          notify();
       }
-      
+
       return statusToReturn;
    }
 }




More information about the jboss-cvs-commits mailing list