[jboss-cvs] JBoss Messaging SVN: r7479 - in branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice: impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 26 05:47:33 EDT 2009
Author: timfox
Date: 2009-06-26 05:47:32 -0400 (Fri, 26 Jun 2009)
New Revision: 7479
Added:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
Log:
mt replication commit 2
Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java 2009-06-26 09:47:32 UTC (rev 7479)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A ClusterQueueStateManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public interface ClusterQueueStateManager extends MessagingComponent
+{
+ Object getNotificationLock();
+
+ void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
+}
Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java 2009-06-26 09:47:32 UTC (rev 7479)
@@ -0,0 +1,442 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationListener;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.BindingType;
+import org.jboss.messaging.core.postoffice.ClusterQueueStateManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueInfo;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.utils.ExecutorFactory;
+import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.TypedProperties;
+import org.jboss.messaging.utils.UUIDGenerator;
+
+/**
+ * A ClusterQueueStateManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ClusterQueueStateManagerImpl implements ClusterQueueStateManager, NotificationListener
+{
+ public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
+
+ private final Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
+
+ private final PostOffice postOffice;
+
+ private final StorageManager storageManager;
+
+ private final ExecutorFactory redistributorExecutorFactory;
+
+ private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+ private final ManagementService managementService;
+
+ private boolean started;
+
+ public ClusterQueueStateManagerImpl(final ExecutorFactory redistributorExecutorFactory,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final ManagementService managementService)
+ {
+ this.redistributorExecutorFactory = redistributorExecutorFactory;
+
+ this.addressSettingsRepository = addressSettingsRepository;
+
+ this.postOffice = postOffice;
+
+ this.storageManager = storageManager;
+
+ this.managementService = managementService;
+ }
+
+ public synchronized void start()
+ {
+ if (started)
+ {
+ return;
+ }
+
+ managementService.addNotificationListener(this);
+
+ started = true;
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ managementService.removeNotificationListener(this);
+
+ started = false;
+ }
+
+ public synchronized boolean isStarted()
+ {
+ return started;
+ }
+
+ public Object getNotificationLock()
+ {
+ return this;
+ }
+
+ public synchronized void onNotification(final Notification notification)
+ {
+ NotificationType type = notification.getType();
+
+ switch (type)
+ {
+ case BINDING_ADDED:
+ {
+ TypedProperties props = notification.getProperties();
+
+ Integer bindingType = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_TYPE);
+
+ if (bindingType == null)
+ {
+ throw new IllegalArgumentException("Binding type not specified");
+ }
+
+ if (bindingType == BindingType.DIVERT_INDEX)
+ {
+ // We don't propagate diverts
+ return;
+ }
+
+ SimpleString routingName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+ SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+ SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
+
+ Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+
+ SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
+
+ QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
+
+ queueInfos.put(clusterName, info);
+
+ break;
+ }
+ case BINDING_REMOVED:
+ {
+ TypedProperties props = notification.getProperties();
+
+ SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+ if (clusterName == null)
+ {
+ throw new IllegalStateException("No cluster name");
+ }
+
+ QueueInfo info = queueInfos.remove(clusterName);
+
+ if (info == null)
+ {
+ throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+ }
+
+ break;
+ }
+ case CONSUMER_CREATED:
+ {
+ TypedProperties props = notification.getProperties();
+
+ SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+ if (clusterName == null)
+ {
+ throw new IllegalStateException("No cluster name");
+ }
+
+ SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ QueueInfo info = queueInfos.get(clusterName);
+
+ if (info == null)
+ {
+ throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+ }
+
+ info.incrementConsumers();
+
+ if (filterString != null)
+ {
+ List<SimpleString> filterStrings = info.getFilterStrings();
+
+ if (filterStrings == null)
+ {
+ filterStrings = new ArrayList<SimpleString>();
+
+ info.setFilterStrings(filterStrings);
+ }
+
+ filterStrings.add(filterString);
+ }
+
+ Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
+
+ if (distance == null)
+ {
+ throw new IllegalStateException("No distance");
+ }
+
+ if (distance > 0)
+ {
+ SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+ if (queueName == null)
+ {
+ throw new IllegalStateException("No queue name");
+ }
+
+ Binding binding = postOffice.getBinding(queueName);
+
+ if (binding != null)
+ {
+ // We have a local queue
+ Queue queue = (Queue)binding.getBindable();
+
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
+
+ long redistributionDelay = addressSettings.getRedistributionDelay();
+
+ if (redistributionDelay != -1)
+ {
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+ }
+ }
+ }
+
+ break;
+ }
+ case CONSUMER_CLOSED:
+ {
+ TypedProperties props = notification.getProperties();
+
+ SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+ if (clusterName == null)
+ {
+ throw new IllegalStateException("No distance");
+ }
+
+ SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ QueueInfo info = queueInfos.get(clusterName);
+
+ if (info == null)
+ {
+ throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+ }
+
+ info.decrementConsumers();
+
+ if (filterString != null)
+ {
+ List<SimpleString> filterStrings = info.getFilterStrings();
+
+ filterStrings.remove(filterString);
+ }
+
+ if (info.getNumberOfConsumers() == 0)
+ {
+ Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
+
+ if (distance == null)
+ {
+ throw new IllegalStateException("No cluster name");
+ }
+
+ if (distance == 0)
+ {
+ SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+ if (queueName == null)
+ {
+ throw new IllegalStateException("No queue name");
+ }
+
+ Binding binding = postOffice.getBinding(queueName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("No queue " + queueName);
+ }
+
+ Queue queue = (Queue)binding.getBindable();
+
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
+
+ long redistributionDelay = addressSettings.getRedistributionDelay();
+
+ if (redistributionDelay != -1)
+ {
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+ }
+ }
+ }
+
+ break;
+ }
+ case SECURITY_AUTHENTICATION_VIOLATION:
+ case SECURITY_PERMISSION_VIOLATION:
+ break;
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type " + type);
+ }
+
+ }
+ }
+
+ public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
+ {
+ // We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
+ // this is crucial for ensuring
+ // that queue infos and notifications are received in a contiguous consistent stream
+ Binding binding = postOffice.getBinding(queueName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find queue " + queueName);
+ }
+
+ Queue queue = (Queue)binding.getBindable();
+
+ // Need to lock to make sure all queue info and notifications are in the correct order with no gaps
+ synchronized (this)
+ {
+ // First send a reset message
+
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+ message.setBody(ChannelBuffers.EMPTY_BUFFER);
+ message.setDestination(queueName);
+ message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
+ queue.preroute(message, null);
+ queue.route(message, null);
+
+ for (QueueInfo info : queueInfos.values())
+ {
+ if (info.getAddress().startsWith(address))
+ {
+ message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+ message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
+ message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
+ message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+ message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
+ message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
+
+ routeDirect(queue, message);
+
+ int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+
+ for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+ {
+ message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+ message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
+ message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
+ message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
+
+ routeDirect(queue, message);
+ }
+
+ if (info.getFilterStrings() != null)
+ {
+ for (SimpleString filterString : info.getFilterStrings())
+ {
+ message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+ message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
+ message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
+ message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
+
+ routeDirect(queue, message);
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
+ {
+ if (queue.getFilter() == null || queue.getFilter().match(message))
+ {
+ queue.preroute(message, null);
+ queue.route(message, null);
+ }
+ }
+
+ private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
+ {
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+ message.setBody(ChannelBuffers.EMPTY_BUFFER);
+
+ message.setDestination(queueName);
+
+ String uid = UUIDGenerator.getInstance().generateStringUUID();
+
+ message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
+ message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+ message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
+
+ return message;
+ }
+}
More information about the jboss-cvs-commits
mailing list