[jboss-cvs] JBoss Messaging SVN: r5702 - in trunk: src/main/org/jboss/messaging/core/client/management/impl and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 23 08:28:05 EST 2009
Author: timfox
Date: 2009-01-23 08:28:05 -0500 (Fri, 23 Jan 2009)
New Revision: 5702
Added:
trunk/src/main/org/jboss/messaging/core/management/Notification.java
trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java
trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/Consumer.java
trunk/src/main/org/jboss/messaging/core/server/Distributor.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
More clustering work
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -233,7 +233,7 @@
SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
- if (msg.getEncodeSize() > minLargeMessageSize)
+ if (msg.getEncodeSize() >= minLargeMessageSize)
{
sendMessageInChunks(sendBlocking, msg);
}
@@ -255,7 +255,7 @@
{
int headerSize = msg.getPropertiesEncodeSize();
- if (headerSize > minLargeMessageSize)
+ if (headerSize >= minLargeMessageSize)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE,
"Header size (" + headerSize + ") is too big, use the messageBody for large data, or increase minLargeMessageSize");
Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -48,24 +48,28 @@
// Constants -----------------------------------------------------
- public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("JBMJMXObjectName");
+ public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("JBM_JMX_ObjectName");
- public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("JBMJMXAttribute.");
+ public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("JBM_JMXAttribute.");
- public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("JBMJMXOperation.");
+ public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("JBM_JMXOperation.");
public static final SimpleString HDR_JMX_OPERATION_NAME = new SimpleString(HDR_JMX_OPERATION_PREFIX + "name");
- public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("JBMJMXOperationSucceeded");
+ public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("JBM_JMXOperationSucceeded");
- public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBMJMXOperationException");
+ public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBM_JMXOperationException");
- public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBMNotifType");
+ public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBM_NotifType");
- public static final SimpleString HDR_NOTIFICATION_MESSAGE = new SimpleString("JBMNotifMessage");
+ public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBM_NotifTimestamp");
+
+ public static final SimpleString HDR_QUEUE_NAME = new SimpleString("JBM_QueueName");
+
+ public static final SimpleString HDR_ADDRESS = new SimpleString("JBM_Address");
+
+ public static final SimpleString HDR_FILTERSTRING = new SimpleString("JBM_FilterString");
- public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBMNotifTimestamp");
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -33,7 +33,6 @@
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
-import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -44,6 +43,7 @@
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -108,10 +108,8 @@
public Object getResource(ObjectName objectName);
- void handleMessage(Message message);
+ void handleMessage(ServerMessage message);
- void sendNotification(NotificationType type, String message) throws Exception;
-
/**
* the message corresponding to a notification will always contain the properties:
* <ul>
@@ -124,7 +122,11 @@
*
* @see ManagementHelper
*/
- void sendNotification(NotificationType type, String message, TypedProperties props) throws Exception;
+ void sendNotification(Notification notification) throws Exception;
void enableNotifications(boolean enable);
+
+ void addNotificationListener(NotificationListener listener);
+
+ void removeNotificationListener(NotificationListener listener);
}
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -30,6 +30,7 @@
import javax.management.openmbean.TabularData;
import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.util.SimpleString;
/**
* This interface describes the core management interface exposed by the server
@@ -164,5 +165,7 @@
String connectionID);
TabularData getConnectors() throws Exception;
+
+ void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
}
Added: trunk/src/main/org/jboss/messaging/core/management/Notification.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/Notification.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/Notification.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.management;
+
+import org.jboss.messaging.util.TypedProperties;
+
+/**
+ * A Notification
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 22 Jan 2009 16:41:12
+ *
+ *
+ */
+public class Notification
+{
+ private final NotificationType type;
+
+ private final TypedProperties properties;
+
+ public Notification(final NotificationType type, final TypedProperties properties)
+ {
+ this.type = type;
+ this.properties = properties;
+ }
+
+ public NotificationType getType()
+ {
+ return type;
+ }
+
+ public TypedProperties getProperties()
+ {
+ return properties;
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationListener.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.management;
+
+/**
+ * A NotificationListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 22 Jan 2009 16:48:27
+ *
+ *
+ */
+public interface NotificationListener
+{
+ void onNotification(Notification notification);
+}
Modified: trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationType.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationType.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -31,5 +31,5 @@
*/
public enum NotificationType
{
- QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED;
+ QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED, CONSUMER_CREATED, CONSUMER_CLOSED;
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -55,11 +55,12 @@
import org.jboss.messaging.core.management.DiscoveryGroupControlMBean;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationListener;
import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareAddressControlWrapper;
import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareMessagingServerControlWrapper;
import org.jboss.messaging.core.management.jmx.impl.ReplicationAwareQueueControlWrapper;
-import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.messagecounter.MessageCounter;
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -79,11 +80,13 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
/*
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:fox at redhat.com">Tim Fox</a>
*
* @version <tt>$Revision$</tt>
*/
@@ -122,7 +125,9 @@
private boolean started = false;
private boolean noticationsEnabled;
-
+
+ private final Set<NotificationListener> listeners = new ConcurrentHashSet<NotificationListener>();
+
// Static --------------------------------------------------------
public static ObjectName getMessagingServerObjectName() throws Exception
@@ -233,19 +238,31 @@
AddressControl addressControl = new AddressControl(address, postOffice, securityRepository);
registerInJMX(objectName, new ReplicationAwareAddressControlWrapper(objectName, addressControl));
+
registerInRegistry(objectName, addressControl);
+
if (log.isDebugEnabled())
{
log.debug("registered address " + objectName);
}
- sendNotification(NotificationType.ADDRESS_ADDED, address.toString());
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+
+ sendNotification(new Notification(NotificationType.ADDRESS_ADDED, props));
}
public void unregisterAddress(final SimpleString address) throws Exception
{
ObjectName objectName = getAddressObjectName(address);
+
unregisterResource(objectName);
- sendNotification(NotificationType.ADDRESS_REMOVED, address.toString());
+
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+
+ sendNotification(new Notification(NotificationType.ADDRESS_REMOVED, props));
}
public void registerQueue(final Queue queue, final SimpleString address, final StorageManager storageManager) throws Exception
@@ -266,7 +283,13 @@
{
log.debug("registered queue " + objectName);
}
- sendNotification(NotificationType.QUEUE_CREATED, queue.getName().toString());
+
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, queue.getName());
+
+ sendNotification(new Notification(NotificationType.QUEUE_CREATED, props));
}
public void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
@@ -274,8 +297,13 @@
ObjectName objectName = getQueueObjectName(address, name);
unregisterResource(objectName);
messageCounterManager.unregisterMessageCounter(name.toString());
+
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, name);
- sendNotification(NotificationType.QUEUE_DESTROYED, name.toString());
+ sendNotification(new Notification(NotificationType.QUEUE_DESTROYED, props));
}
public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
@@ -334,7 +362,7 @@
unregisterResource(objectName);
}
- public void handleMessage(final Message message)
+ public void handleMessage(final ServerMessage message)
{
SimpleString objectName = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_OBJECTNAME);
if (log.isDebugEnabled())
@@ -425,6 +453,16 @@
registry.put(objectName, managedResource);
}
+ public void addNotificationListener(final NotificationListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ public void removeNotificationListener(final NotificationListener listener)
+ {
+ listeners.remove(listener);
+ }
+
// MessagingComponent implementation -----------------------------
public void start() throws Exception
@@ -476,45 +514,53 @@
}
}
}
-
- public void sendNotification(final NotificationType type, final String message) throws Exception
- {
- sendNotification(type, message, null);
- }
-
- public void sendNotification(final NotificationType type, final String message, TypedProperties props) throws Exception
- {
- // TODO - we need a parameter to determine if the notification is durable or not
+
+ public void sendNotification(final Notification notification) throws Exception
+ {
if (managedServer != null && noticationsEnabled)
{
- ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
- notificationMessage.setDestination(managementNotificationAddress);
- notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
-
- TypedProperties notifProps;
- if (props != null)
+ //This needs to be synchronized since we need to ensure notifications are processed in strict sequence
+ synchronized (this)
{
- notifProps = props;
+ //First send to any local listeners
+ for (NotificationListener listener: listeners)
+ {
+ try
+ {
+ listener.onNotification(notification);
+ }
+ catch (Exception e)
+ {
+ //Exception thrown from one listener should not stop execution of others
+ log.error("Failed to call listener", e);
+ }
+ }
+
+ //Now send message
+
+ ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+ notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+ //Notification messages are always durable so the user can choose whether to add a durable queue to consume them in
+ notificationMessage.setDurable(true);
+ notificationMessage.setDestination(managementNotificationAddress);
+
+ TypedProperties notifProps;
+ if (notification.getProperties() != null)
+ {
+ notifProps = notification.getProperties();
+ }
+ else
+ {
+ notifProps = new TypedProperties();
+ }
+
+ notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));
+ notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+ notificationMessage.putTypedProperties(notifProps);
+
+ postOffice.route(notificationMessage, null);
}
- else
- {
- notifProps = new TypedProperties();
- }
-
- notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
- notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_MESSAGE, new SimpleString(message));
- notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-
- notificationMessage.putTypedProperties(notifProps);
-
- // List<MessageReference> refs = postOffice.route(notificationMessage);
- //
- // for (MessageReference ref : refs)
- // {
- // ref.getQueue().add(ref);
- // }
-
- postOffice.route(notificationMessage, null);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -57,7 +57,6 @@
import org.jboss.messaging.core.postoffice.impl.BindingImpl;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.RemotingService;
-import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
@@ -544,6 +543,11 @@
Collection<TransportConfiguration> connectorConfigurations = configuration.getConnectorConfigurations().values();
return TransportConfigurationInfo.toTabularData(connectorConfigurations);
}
+
+ public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+ {
+ postOffice.sendQueueInfoToQueue(queueName);
+ }
// NotificationEmitter implementation ----------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -23,7 +23,6 @@
package org.jboss.messaging.core.management.jmx.impl;
import java.util.List;
-import java.util.Map;
import javax.management.MBeanInfo;
import javax.management.ObjectName;
@@ -33,6 +32,7 @@
import org.jboss.messaging.core.management.MessagingServerControlMBean;
import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
import org.jboss.messaging.core.management.impl.MessagingServerControl;
+import org.jboss.messaging.util.SimpleString;
/**
* A ReplicationAwareMessagingServerControlWrapper
@@ -64,12 +64,11 @@
// MessagingServerControlMBean implementation ------------------------------
-
public String getBackupConnectorName()
{
return localControl.getBackupConnectorName();
}
-
+
public String getBindingsDirectory()
{
return localControl.getBindingsDirectory();
@@ -234,12 +233,17 @@
{
return localControl.listSessions(connectionID);
}
-
+
public TabularData getConnectors() throws Exception
{
return localControl.getConnectors();
}
-
+
+ public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+ {
+ localControl.sendQueueInfoToQueue(queueName);
+ }
+
public boolean addAddress(String address) throws Exception
{
return (Boolean)replicationAwareInvoke("addAddress", address);
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -55,22 +55,24 @@
private static final Logger log = Logger.getLogger(MessageImpl.class);
- public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("_JBM_ACTUAL_EXPIRY");
+ public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBM_ACTUAL_EXPIRY");
- public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("_JBM_ORIG_DESTINATION");
+ public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("JBM_ORIG_DESTINATION");
- public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_JBM_ORIG_MESSAGE_ID");
+ public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("JBM_ORIG_MESSAGE_ID");
- public static final SimpleString HDR_GROUP_ID = new SimpleString("_JBM_GROUP_ID");
+ public static final SimpleString HDR_GROUP_ID = new SimpleString("JBM_GROUP_ID");
- public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_JBM_SCHED_DELIVERY");
+ public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("JBM_SCHED_DELIVERY");
- public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("_JBM_DUPL_ID");
+ public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("JBM_DUPL_ID");
- public static final SimpleString HDR_MAX_HOPS = new SimpleString("_JBM_MAX_HOPS");
+ public static final SimpleString HDR_MAX_HOPS = new SimpleString("JBM_MAX_HOPS");
- public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("_JBM_ROUTE_TO:");
+ public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("JBM_ROUTE_TO:");
+ public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("JBM_RESET_QUEUE_DATA");
+
// Attributes ----------------------------------------------------
protected long messageID;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -80,4 +80,6 @@
SendLock getAddressLock(SimpleString address);
DuplicateIDCache getDuplicateIDCache(SimpleString address);
+
+ void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
}
Copied: trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java (from rev 5686, trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A QueueInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 20:55:06
+ *
+ *
+ */
+public class QueueInfo implements Serializable
+{
+ private static final long serialVersionUID = 3451892849198803182L;
+
+ private final SimpleString queueName;
+
+ private final SimpleString address;
+
+ private List<SimpleString> filterStrings;
+
+ private int numberOfConsumers;
+
+ public QueueInfo(final SimpleString queueName, final SimpleString address)
+ {
+ this.queueName = queueName;
+ this.address = address;
+ }
+
+ public SimpleString getQueueName()
+ {
+ return queueName;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public List<SimpleString> getFilterStrings()
+ {
+ return filterStrings;
+ }
+
+ public void setFilterStrings(final List<SimpleString> filterStrings)
+ {
+ this.filterStrings = filterStrings;
+ }
+
+ public int getNumberOfConsumers()
+ {
+ return numberOfConsumers;
+ }
+
+ public void incrementConsumers()
+ {
+ this.numberOfConsumers++;
+ }
+
+ public void decrementConsumers()
+ {
+ this.numberOfConsumers--;
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/FlowBinding.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.cluster.impl.FlowBindingFilter;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 21 Jan 2009 18:55:22
- *
- *
- */
-public class FlowBinding extends BindingImpl
-{
- private final FlowBindingFilter filter;
-
- public FlowBinding(final SimpleString address,
- final SimpleString uniqueName,
- final SimpleString routingName,
- final Bindable bindable,
- final FlowBindingFilter filter)
- {
- super(address, uniqueName, routingName, bindable, false, false);
-
- this.filter = filter;
- }
-
- public boolean accept(final ServerMessage message) throws Exception
- {
- if (filter.match(message))
- {
- return bindable.accept(message);
- }
- else
- {
- return false;
- }
- }
-
- public FlowBindingFilter getFilter()
- {
- return filter;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.postoffice.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -34,9 +35,13 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationListener;
+import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
@@ -47,12 +52,15 @@
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueInfo;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.SendLock;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.SendLockImpl;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionOperation;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
@@ -60,6 +68,7 @@
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
/**
* A PostOfficeImpl
@@ -68,7 +77,7 @@
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
* @author <a href="csuconic at redhat.com">Clebert Suconic</a>
*/
-public class PostOfficeImpl implements PostOffice
+public class PostOfficeImpl implements PostOffice, NotificationListener
{
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
@@ -143,13 +152,15 @@
this.idCacheSize = idCacheSize;
- this.persistIDCache = persistIDCache;
+ this.persistIDCache = persistIDCache;
}
// MessagingComponent implementation ---------------------------------------
public void start() throws Exception
{
+ managementService.addNotificationListener(this);
+
if (pagingManager != null)
{
pagingManager.setPostOffice(this);
@@ -174,12 +185,13 @@
public void stop() throws Exception
{
+ managementService.removeNotificationListener(this);
+
if (messageExpiryExecutor != null)
{
messageExpiryExecutor.shutdown();
}
-
addressManager.clear();
// Release all the locks
@@ -197,6 +209,168 @@
{
return started;
}
+
+ // NotificationListener implementation -------------------------------------
+
+
+ private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
+
+ private final Object notificationLock = new Object();
+
+ public void onNotification(final Notification notification)
+ {
+ synchronized (notificationLock)
+ {
+ NotificationType type = notification.getType();
+
+ if (type == NotificationType.QUEUE_CREATED)
+ {
+ TypedProperties props = notification.getProperties();
+
+ SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
+
+ QueueInfo info = new QueueInfo(queueName, address);
+
+ queueInfos.put(queueName, info);
+ }
+ else if (type == NotificationType.QUEUE_DESTROYED)
+ {
+ TypedProperties props = notification.getProperties();
+
+ SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ queueInfos.remove(queueName);
+ }
+ else if (type == NotificationType.CONSUMER_CREATED)
+ {
+ TypedProperties props = notification.getProperties();
+
+ SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ QueueInfo info = queueInfos.get(queueName);
+
+ info.incrementConsumers();
+
+ if (filterString != null)
+ {
+ List<SimpleString> filterStrings = info.getFilterStrings();
+
+ if (filterStrings == null)
+ {
+ filterStrings = new ArrayList<SimpleString>();
+
+ info.setFilterStrings(filterStrings);
+ }
+ }
+ }
+ else if (type == NotificationType.CONSUMER_CLOSED)
+ {
+ TypedProperties props = notification.getProperties();
+
+ SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ QueueInfo info = queueInfos.get(queueName);
+
+ info.decrementConsumers();
+
+ if (filterString != null)
+ {
+ List<SimpleString> filterStrings = info.getFilterStrings();
+
+ filterStrings.remove(filterString);
+ }
+ }
+ else
+ {
+ return;
+ }
+ }
+ }
+
+ private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
+ {
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+ message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+
+ message.setDestination(queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(NotificationType.QUEUE_CREATED.toString()));
+ message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+ return message;
+ }
+
+ public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+ {
+ //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
+ //that queue infos and notifications are received in a contiguous consistent stream
+ Binding binding = addressManager.getBinding(queueName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find queue " + queueName);
+ }
+
+ Queue queue = (Queue)binding.getBindable();
+
+ //Need to lock to make sure all queue info and notifications are in the correct order with no gaps
+ synchronized (notificationLock)
+ {
+ //First send a reset message
+
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+ message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+ message.setDestination(queueName);
+ message.putBooleanProperty(MessageImpl.HDR_RESET_QUEUE_DATA, true);
+
+ queue.accept(message);
+ queue.route(message, null);
+
+ for (QueueInfo info: queueInfos.values())
+ {
+ message = createQueueInfoMessage(NotificationType.QUEUE_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+
+ queue.accept(message);
+ queue.route(message, null);
+
+ int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+
+ for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+ {
+ message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+
+ queue.accept(message);
+ queue.route(message, null);
+ }
+
+ if (info.getFilterStrings() != null)
+ {
+ for (SimpleString filterString: info.getFilterStrings())
+ {
+ message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+ message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+
+ queue.accept(message);
+ queue.route(message, null);
+ }
+ }
+ }
+ }
+
+ }
// PostOffice implementation -----------------------------------------------
@@ -285,7 +459,7 @@
}
public void route(final ServerMessage message, Transaction tx) throws Exception
- {
+ {
SimpleString address = message.getDestination();
if (checkAllowable)
@@ -360,7 +534,7 @@
}
Bindings bindings = addressManager.getBindings(address);
-
+
if (bindings != null)
{
bindings.route(message, tx);
@@ -442,7 +616,7 @@
return cache;
}
-
+
// Private -----------------------------------------------------------------
private final PageMessageOperation getPageOperation(final Transaction tx)
Modified: trunk/src/main/org/jboss/messaging/core/server/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Consumer.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/Consumer.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,8 +22,10 @@
package org.jboss.messaging.core.server;
+import org.jboss.messaging.util.SimpleString;
+
/**
*
* A ClientConsumer
@@ -34,4 +36,6 @@
public interface Consumer
{
HandleStatus handle(MessageReference reference) throws Exception;
+
+ SimpleString getFilterString();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Distributor.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/Distributor.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.server;
+import java.util.List;
+
/**
*
* A Distributor
@@ -40,4 +42,6 @@
int getConsumerCount();
boolean hasConsumers();
+
+ List<Consumer> getConsumers();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -59,6 +59,8 @@
boolean removeConsumer(Consumer consumer) throws Exception;
int getConsumerCount();
+
+ List<Consumer> getConsumers();
void addLast(MessageReference ref);
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 23 Jan 2009 11:58:05
+ *
+ *
+ */
+public interface FlowBinding extends Binding
+{
+ void addConsumer(SimpleString filterString) throws Exception;
+
+ void removeConsumer(SimpleString filterString) throws Exception;
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,24 +22,36 @@
package org.jboss.messaging.core.server.cluster.impl;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.QueueInfo;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.server.HandleStatus;
@@ -53,6 +65,8 @@
import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
+import org.jboss.messaging.util.UUIDGenerator;
/**
* A BridgeImpl
@@ -114,17 +128,19 @@
private final boolean useDuplicateDetection;
private volatile boolean active;
-
+
private final Pair<TransportConfiguration, TransportConfiguration> connectorPair;
-
+
private final long retryInterval;
-
+
private final double retryIntervalMultiplier;
-
+
private final int maxRetriesBeforeFailover;
-
+
private final int maxRetriesAfterFailover;
+ private final MessageHandler queueInfoMessageHandler;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -146,7 +162,8 @@
final double retryIntervalMultiplier,
final int maxRetriesBeforeFailover,
final int maxRetriesAfterFailover,
- final boolean useDuplicateDetection) throws Exception
+ final boolean useDuplicateDetection,
+ final MessageHandler queueInfoMessageHandler) throws Exception
{
this.name = name;
@@ -176,17 +193,19 @@
this.transformer = transformer;
this.useDuplicateDetection = useDuplicateDetection;
-
+
this.connectorPair = connectorPair;
-
+
this.retryInterval = retryInterval;
-
+
this.retryIntervalMultiplier = retryIntervalMultiplier;
-
+
this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
-
+
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
+ this.queueInfoMessageHandler = queueInfoMessageHandler;
+
if (maxBatchTime != -1)
{
future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(),
@@ -206,7 +225,7 @@
{
return;
}
-
+
executor.execute(new CreateObjectsRunnable());
started = true;
@@ -221,7 +240,7 @@
createTx();
queue.addConsumer(BridgeImpl.this);
-
+
csf = new ClientSessionFactoryImpl(connectorPair.a,
connectorPair.b,
retryInterval,
@@ -235,6 +254,42 @@
session.addFailureListener(BridgeImpl.this);
+ if (queueInfoMessageHandler != null)
+ {
+ // Get the queue data
+
+ SimpleString notifQueueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
+
+ SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
+ "'" +
+ NotificationType.QUEUE_CREATED +
+ "'" +
+ "'" +
+ NotificationType.QUEUE_DESTROYED +
+ "'" +
+ "'" +
+ NotificationType.CONSUMER_CREATED +
+ "'" +
+ "'" +
+ NotificationType.CONSUMER_CLOSED +
+ "'");
+
+ session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
+
+ ClientConsumer notifConsumer = session.createConsumer(notifQueueName);
+
+ notifConsumer.setMessageHandler(queueInfoMessageHandler);
+
+ session.start();
+
+ ClientMessage message = session.createClientMessage(false);
+
+ ManagementHelper.putOperationInvocation(message,
+ ManagementServiceImpl.getMessagingServerObjectName(),
+ "sendQueueInfoToQueue",
+ notifQueueName);
+ }
+
active = true;
queue.deliverAsync(executor);
@@ -244,7 +299,7 @@
log.warn("Unable to connect. Bridge is now disabled.", e);
active = false;
-
+
started = false;
}
}
@@ -275,7 +330,7 @@
{
log.warn("Timed out waiting for batch to be sent");
}
-
+
csf.close();
}
@@ -348,27 +403,27 @@
return true;
}
-
+
private void fail()
{
if (!started)
{
return;
}
-
+
log.warn("Bridge connection to target failed. Will try to reconnect");
-
+
try
{
tx.rollback();
-
+
stop();
}
catch (Exception e)
{
log.error("Failed to stop", e);
}
-
+
executor.execute(new CreateObjectsRunnable());
}
@@ -433,7 +488,7 @@
}
else
{
- //Preserve the original address
+ // Preserve the original address
dest = message.getDestination();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -22,7 +22,8 @@
package org.jboss.messaging.core.server.cluster.impl;
-import java.util.ArrayList;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_RESET_QUEUE_DATA;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,28 +32,29 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.BindingImpl;
-import org.jboss.messaging.core.postoffice.impl.FlowBinding;
-import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.server.cluster.FlowBinding;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.ExecutorFactory;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
/**
*
@@ -85,9 +87,9 @@
private final boolean useDuplicateDetection;
private final int maxHops;
+
+ private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
- private Map<Pair<TransportConfiguration, TransportConfiguration>, Bridge> bridges = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Bridge>();
-
private final DiscoveryGroup discoveryGroup;
private final ScheduledExecutorService scheduledExecutor;
@@ -158,7 +160,7 @@
this.name = name;
this.address = address;
-
+
this.bridgeConfig = bridgeConfig;
this.executorFactory = executorFactory;
@@ -170,7 +172,7 @@
this.queueSettingsRepository = queueSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
-
+
this.queueFactory = queueFactory;
this.discoveryGroup = discoveryGroup;
@@ -209,9 +211,9 @@
discoveryGroup.unregisterListener(this);
}
- for (Bridge bridge : bridges.values())
+ for (MessageFlowRecord record : records.values())
{
- bridge.stop();
+ record.close();
}
started = false;
@@ -249,18 +251,19 @@
connectorSet.addAll(connectors);
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge>> iter = bridges.entrySet()
- .iterator();
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
+ .iterator();
while (iter.hasNext())
{
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Bridge> entry = iter.next();
+ Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
if (!connectorSet.contains(entry.getKey()))
{
- // Connector no longer there - we should remove and close it - we don't delete the queue though - it may have messages - this is up to the admininstrator to do this
+ // Connector no longer there - we should remove and close it - we don't delete the queue though - it may
+ // have messages - this is up to the admininstrator to do this
- entry.getValue().stop();
+ entry.getValue().close();
iter.remove();
}
@@ -268,14 +271,9 @@
for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
{
- if (!bridges.containsKey(connectorPair))
+ if (!records.containsKey(connectorPair))
{
- SimpleString queueName = new SimpleString("cluster." + name +
- "." +
- generateConnectorString(connectorPair.a) +
- "-" +
- (connectorPair.b == null ? "null"
- : generateConnectorString(connectorPair.b)));
+ SimpleString queueName = generateQueueName(name, connectorPair);
Binding queueBinding = postOffice.getBinding(queueName);
@@ -289,12 +287,20 @@
{
queue = queueFactory.createQueue(-1, name, null, true, false);
- // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never actually routed to at that address though
+ // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
+ // actually routed to at that address though
- Binding storeBinding = new BindingImpl(queue.getName(), queue.getName(), queue.getName(), queue, false, true);
+ Binding storeBinding = new BindingImpl(queue.getName(),
+ queue.getName(),
+ queue.getName(),
+ queue,
+ false,
+ true);
storageManager.addQueueBinding(storeBinding);
}
+
+ MessageFlowRecord record = new MessageFlowRecord(queue);
Bridge bridge = new BridgeImpl(queueName,
queue,
@@ -311,59 +317,28 @@
bridgeConfig.getRetryIntervalMultiplier(),
bridgeConfig.getMaxRetriesBeforeFailover(),
bridgeConfig.getMaxRetriesAfterFailover(),
- false);
+ false,
+ record);
+
+ record.setBridge(bridge);
- bridges.put(connectorPair, bridge);
+ records.put(connectorPair, record);
bridge.start();
}
}
}
-
- private void updateQueueInfo(final Pair<TransportConfiguration, TransportConfiguration> connectorPair, final QueueInfo info) throws Exception
+
+ private SimpleString generateQueueName(final SimpleString clusterName,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
{
- Bridge bridge = this.bridges.get(connectorPair);
-
- if (bridge == null)
- {
- throw new IllegalArgumentException("Cannot find bridge for " + connectorPair);
- }
-
- SimpleString uniqueName = null; // ?????
-
- FlowBinding flowBinding = (FlowBinding)postOffice.getBinding(uniqueName);
-
- if (flowBinding == null)
- {
- //TODO - can be optimised by storing the queue with the bridge in this class
- Binding binding = postOffice.getBinding(bridge.getName());
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find queue with name " + bridge.getName());
- }
-
- Queue queue = (Queue)binding.getBindable();
-
- FlowBindingFilter filter = new FlowBindingFilter(info);
-
- flowBinding = new FlowBinding(new SimpleString(info.getAddress()), uniqueName, new SimpleString(info.getQueueName()), queue, filter);
-
- postOffice.addBinding(flowBinding);
- }
- else
- {
- FlowBindingFilter filter = flowBinding.getFilter();
-
- filter.updateInfo(info);
- }
+ return new SimpleString("cluster." + name +
+ "." +
+ generateConnectorString(connectorPair.a) +
+ "-" +
+ (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
}
-
- private void removeQueueInfo()
- {
- //TODO
- }
-
+
private String replaceWildcardChars(final String str)
{
return str.replace('.', '-');
@@ -397,5 +372,115 @@
return new SimpleString(str.toString());
}
+
+ // Inner classes -----------------------------------------------------------------------------------
+
+ private class MessageFlowRecord implements MessageHandler
+ {
+ private Bridge bridge;
+ private final Queue queue;
+
+ private final Map<SimpleString, FlowBinding> bindings = new HashMap<SimpleString, FlowBinding>();
+
+ private boolean firstReset = false;
+
+ public MessageFlowRecord(final Queue queue)
+ {
+ this.queue = queue;
+ }
+
+ public void close() throws Exception
+ {
+ bridge.stop();
+
+ for (FlowBinding binding : bindings.values())
+ {
+ postOffice.removeBinding(binding.getUniqueName());
+ }
+ }
+
+ public void setBridge(final Bridge bridge)
+ {
+ this.bridge = bridge;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ // Reset the bindings
+ if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
+ {
+ for (FlowBinding binding : bindings.values())
+ {
+ postOffice.removeBinding(binding.getUniqueName());
+ }
+
+ bindings.clear();
+
+ firstReset = true;
+ }
+
+ if (!firstReset)
+ {
+ return;
+ }
+
+ NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
+ .toString());
+
+ if (type == NotificationType.QUEUE_CREATED)
+ {
+ SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
+ .generateSimpleStringUUID());
+
+ SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
+
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ FlowBinding binding = new FlowBindingImpl(queueAddress, uniqueName, queueName, queue, useDuplicateDetection);
+
+ bindings.put(queueName, binding);
+
+ postOffice.addBinding(binding);
+ }
+ else if (type == NotificationType.QUEUE_DESTROYED)
+ {
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ FlowBinding binding = bindings.remove(queueName);
+
+ postOffice.removeBinding(binding.getUniqueName());
+ }
+ else if (type == NotificationType.CONSUMER_CREATED)
+ {
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ FlowBinding binding = bindings.get(queueName);
+
+ binding.addConsumer(filterString);
+ }
+ else if (type == NotificationType.CONSUMER_CLOSED)
+ {
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ FlowBinding binding = bindings.get(queueName);
+
+ binding.removeConsumer(filterString);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle message", e);
+ }
+ }
+
+ }
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -373,7 +373,8 @@
config.getRetryIntervalMultiplier(),
config.getMaxRetriesBeforeFailover(),
config.getMaxRetriesAfterFailover(),
- config.isUseDuplicateDetection());
+ config.isUseDuplicateDetection(),
+ null);
log.info("put bridge " + this);
bridges.put(config.getName(), bridge);
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingFilter.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -1,122 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBindingFilter
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 21 Jan 2009 20:53:20
- *
- *
- */
-public class FlowBindingFilter implements Filter
-{
- private volatile QueueInfo info;
-
- private final Map<String, Filter> filters = new HashMap<String, Filter>();
-
- public FlowBindingFilter(final QueueInfo info) throws Exception
- {
- this.info = info;
-
- updateInfo(info);
- }
-
- public void updateInfo(final QueueInfo info) throws Exception
- {
- this.info = info;
-
- List<String> filterStrings = info.getFilterStrings();
-
- for (String filterString: filterStrings)
- {
- Filter filter = filters.get(filterString);
-
- if (filter == null)
- {
- filter = new FilterImpl(new SimpleString(filterString));
-
- filters.put(filterString, filter);
- }
- }
-
- //TODO - this can be optimised by storing map of filters in QueueInfo
- if (filterStrings.size() < filters.size())
- {
- Iterator<String> iter = filters.keySet().iterator();
-
- while (iter.hasNext())
- {
- String filterString = iter.next();
-
- if (!filterStrings.contains(filterString))
- {
- iter.remove();
- }
- }
- }
- }
-
- public SimpleString getFilterString()
- {
- return null;
- }
-
- public boolean match(final ServerMessage message)
- {
- if (info.getNumberOfConsumers() == 0)
- {
- return false;
- }
-
- if (filters.isEmpty())
- {
- return true;
- }
- else
- {
- for (Filter filter: filters.values())
- {
- if (filter.match(message))
- {
- return true;
- }
- }
- return false;
- }
- }
-
-}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.FlowBinding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A FlowBindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 18:55:22
+ *
+ *
+ */
+public class FlowBindingImpl extends BindingImpl implements FlowBinding
+{
+ private final Set<Filter> filters = new HashSet<Filter>();
+
+ private final Map<SimpleString, Integer> filterCounts = new HashMap<SimpleString, Integer>();
+
+ private int consumerCount;
+
+ private final boolean duplicateDetection;
+
+ public FlowBindingImpl(final SimpleString address,
+ final SimpleString uniqueName,
+ final SimpleString routingName,
+ final Bindable bindable,
+ final boolean duplicateDetection)
+ {
+ super(address, uniqueName, routingName, bindable, false, false);
+
+ this.duplicateDetection = duplicateDetection;
+ }
+
+ public boolean accept(final ServerMessage message) throws Exception
+ {
+ if (consumerCount == 0)
+ {
+ return false;
+ }
+
+ boolean accepted = false;
+
+ if (filters.isEmpty())
+ {
+ accepted = true;
+ }
+ else
+ {
+ for (Filter filter : filters)
+ {
+ if (filter.match(message))
+ {
+ accepted = true;
+
+ break;
+ }
+ }
+ }
+
+ if (duplicateDetection && accepted)
+ {
+ if (!message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
+ {
+ //Add the message id as a duplicate id header - this will be detected when routing on the remote node -
+ //any duplicates will be rejected
+ byte[] bytes = new byte[8];
+
+ ByteBuffer buff = ByteBuffer.wrap(bytes);
+
+ buff.putLong(message.getMessageID());
+
+ SimpleString dupID = new SimpleString(bytes);
+
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ }
+ }
+
+ return accepted;
+ }
+
+ public synchronized void addConsumer(final SimpleString filterString) throws Exception
+ {
+ if (filterString != null)
+ {
+ // There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref
+ // count
+
+ Integer i = filterCounts.get(filterString);
+
+ if (i == null)
+ {
+ filterCounts.put(filterString, 0);
+
+ filters.add(new FilterImpl(filterString));
+ }
+ else
+ {
+ filterCounts.put(filterString, i + 1);
+ }
+ }
+
+ consumerCount++;
+ }
+
+ public synchronized void removeConsumer(final SimpleString filterString) throws Exception
+ {
+ if (filterString != null)
+ {
+ Integer i = filterCounts.get(filterString);
+
+ if (i != null)
+ {
+ int ii = i - 1;
+
+ if (ii == 0)
+ {
+ filterCounts.remove(filterString);
+
+ filters.remove(filterString);
+ }
+ else
+ {
+ filterCounts.put(filterString, ii);
+ }
+ }
+ }
+
+ consumerCount--;
+ }
+
+}
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/QueueInfo.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -1,74 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import java.util.List;
-
-/**
- * A QueueInfo
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 21 Jan 2009 20:55:06
- *
- *
- */
-public class QueueInfo
-{
- private final String queueName;
-
- private final String address;
-
- private final List<String> filterStrings;
-
- private final int numberOfConsumers;
-
- public QueueInfo(final String queueName, final String address, final List<String> filterStrings, final int numberOfConsumers)
- {
- this.queueName = queueName;
- this.address = address;
- this.filterStrings = filterStrings;
- this.numberOfConsumers = numberOfConsumers;
- }
-
- public String getQueueName()
- {
- return queueName;
- }
-
- public String getAddress()
- {
- return address;
- }
-
- public List<String> getFilterStrings()
- {
- return filterStrings;
- }
-
- public int getNumberOfConsumers()
- {
- return numberOfConsumers;
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributorImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -53,4 +53,9 @@
{
return !consumers.isEmpty();
}
+
+ public List<Consumer> getConsumers()
+ {
+ return consumers;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -393,6 +393,11 @@
{
return distributionPolicy.getConsumerCount();
}
+
+ public synchronized List<Consumer> getConsumers()
+ {
+ return new ArrayList<Consumer>(distributionPolicy.getConsumers());
+ }
public synchronized List<MessageReference> list(final Filter filter)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -31,9 +31,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -57,6 +61,7 @@
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
/**
* Concrete implementation of a ClientConsumer.
@@ -127,6 +132,8 @@
private volatile boolean closed;
private final boolean preAcknowledge;
+
+ private final ManagementService managementService;
// Constructors ---------------------------------------------------------------------------------
@@ -141,7 +148,8 @@
final PagingManager pagingManager,
final Channel channel,
final boolean preAcknowledge,
- final Executor executor)
+ final Executor executor,
+ final ManagementService managementService)
{
this.id = id;
@@ -166,6 +174,8 @@
this.preAcknowledge = preAcknowledge;
this.pagingManager = pagingManager;
+
+ this.managementService = managementService;
messageQueue.addConsumer(this);
@@ -184,6 +194,18 @@
{
return doHandle(ref);
}
+
+ public SimpleString getFilterString()
+ {
+ if (filter != null)
+ {
+ return filter.getFilterString();
+ }
+ else
+ {
+ return null;
+ }
+ }
public void handleClose(final Packet packet)
{
@@ -272,6 +294,17 @@
}
tx.rollback();
+
+ if (!browseOnly)
+ {
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, messageQueue.getName());
+
+ Notification notification = new Notification(NotificationType.CONSUMER_CLOSED, props);
+
+ managementService.sendNotification(notification);
+ }
}
public LinkedList<MessageReference> cancelRefs() throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -95,14 +95,7 @@
public MessageReference createReference(final Queue queue)
{
MessageReference ref = new MessageReferenceImpl(this, queue);
-//
-// if (durable && queue.isDurable())
-// {
-// durableRefCount.incrementAndGet();
-// }
-//
-// refCount.incrementAndGet();
-
+
return ref;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -12,6 +12,8 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
@@ -26,11 +28,13 @@
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.client.impl.ClientMessageImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
@@ -94,6 +98,7 @@
import org.jboss.messaging.util.IDGenerator;
import org.jboss.messaging.util.SimpleIDGenerator;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TypedProperties;
/*
* Session implementation
@@ -1392,7 +1397,7 @@
}
}
else
- {
+ {
theQueue = (Queue)binding.getBindable();
}
@@ -1407,9 +1412,26 @@
postOffice.getPagingManager(),
channel,
preAcknowledge,
- executor);
+ executor,
+ managementService);
consumers.put(consumer.getID(), consumer);
+
+ if (!browseOnly)
+ {
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, name);
+
+ if (filterString != null)
+ {
+ props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ }
+
+ Notification notification = new Notification(CONSUMER_CREATED, props);
+
+ managementService.sendNotification(notification);
+ }
response = new NullResponseMessage();
}
@@ -1478,7 +1500,7 @@
}
postOffice.addBinding(binding);
-
+
if (temporary)
{
// Temporary queue in core simply means the queue will be deleted if
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/NotificationTest.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -125,7 +125,7 @@
// create a queue to receive the management notifications only concerning the destination
SimpleString notifQueue = randomSimpleString();
- SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_MESSAGE + " LIKE '%" + destinationName + "%'" );
+ SimpleString filter = new SimpleString(ManagementHelper.HDR_ADDRESS + " LIKE '%" + destinationName + "%'" );
System.out.println(filter);
session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueue, filter, false, true);
ClientConsumer notifConsumer = session.createConsumer(notifQueue);
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -39,7 +39,6 @@
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.SimpleString;
-
/**
*
* A FakePostOffice
@@ -54,12 +53,12 @@
private ConcurrentHashSet<SimpleString> addresses = new ConcurrentHashSet<SimpleString>();
private volatile boolean started;
-
+
public void addBinding(Binding binding) throws Exception
{
bindings.put(binding.getAddress(), binding);
}
-
+
public void route(ServerMessage message, Transaction tx) throws Exception
{
}
@@ -148,4 +147,9 @@
return null;
}
+ public void sendQueueInfoToQueue(SimpleString queueName) throws Exception
+ {
+ }
+
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -589,6 +589,11 @@
class NullConsumer implements Consumer
{
+ public SimpleString getFilterString()
+ {
+ return null;
+ }
+
public HandleStatus handle(MessageReference reference)
{
return null;
@@ -1399,6 +1404,11 @@
class DummyDistributionPolicy implements Distributor
{
+ public List<Consumer> getConsumers()
+ {
+ return null;
+ }
+
Consumer consumer;
public Consumer select(ServerMessage message, boolean redeliver)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java 2009-01-23 08:27:25 UTC (rev 5701)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java 2009-01-23 13:28:05 UTC (rev 5702)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.tests.unit.core.server.impl.fakes;
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -40,69 +41,75 @@
public class FakeConsumer implements Consumer
{
private HandleStatus statusToReturn = HandleStatus.HANDLED;
-
+
private HandleStatus newStatus;
-
+
private int delayCountdown = 0;
-
+
private LinkedList<MessageReference> references = new LinkedList<MessageReference>();
-
+
private Filter filter;
-
+
public FakeConsumer()
- {
+ {
}
-
+
public FakeConsumer(Filter filter)
{
this.filter = filter;
}
-
+
+ public SimpleString getFilterString()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
public synchronized MessageReference waitForNextReference(long timeout)
{
while (references.isEmpty() && timeout > 0)
{
- long start = System.currentTimeMillis();
+ long start = System.currentTimeMillis();
try
{
wait();
}
catch (InterruptedException e)
- {
+ {
}
timeout -= (System.currentTimeMillis() - start);
}
-
+
if (timeout <= 0)
{
throw new IllegalStateException("Timed out waiting for reference");
}
-
+
return references.removeFirst();
}
-
+
public synchronized void setStatusImmediate(HandleStatus newStatus)
{
this.statusToReturn = newStatus;
}
-
+
public synchronized void setStatusDelayed(HandleStatus newStatus, int numReferences)
{
this.newStatus = newStatus;
-
+
this.delayCountdown = numReferences;
}
-
+
public synchronized List<MessageReference> getReferences()
{
return references;
}
-
+
public synchronized void clearReferences()
{
this.references.clear();
}
-
+
public synchronized HandleStatus handle(MessageReference reference)
{
if (filter != null)
@@ -112,7 +119,7 @@
references.addLast(reference);
reference.getQueue().referenceHandled();
notify();
-
+
return HandleStatus.HANDLED;
}
else
@@ -120,28 +127,28 @@
return HandleStatus.NO_MATCH;
}
}
-
+
if (newStatus != null)
- {
+ {
if (delayCountdown == 0)
{
statusToReturn = newStatus;
-
+
newStatus = null;
}
else
- {
+ {
delayCountdown--;
}
}
-
+
if (statusToReturn == HandleStatus.HANDLED)
{
reference.getQueue().referenceHandled();
references.addLast(reference);
notify();
}
-
+
return statusToReturn;
}
}
More information about the jboss-cvs-commits
mailing list