[jboss-cvs] JBoss Messaging SVN: r7479 - in branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jun 26 05:47:33 EDT 2009


Author: timfox
Date: 2009-06-26 05:47:32 -0400 (Fri, 26 Jun 2009)
New Revision: 7479

Added:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
Log:
mt replication commit 2

Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/ClusterQueueStateManager.java	2009-06-26 09:47:32 UTC (rev 7479)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A ClusterQueueStateManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public interface ClusterQueueStateManager extends MessagingComponent
+{
+   Object getNotificationLock();
+   
+   void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
+}

Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/ClusterQueueStateManagerImpl.java	2009-06-26 09:47:32 UTC (rev 7479)
@@ -0,0 +1,442 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.Notification;
+import org.jboss.messaging.core.management.NotificationListener;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.BindingType;
+import org.jboss.messaging.core.postoffice.ClusterQueueStateManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueInfo;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.utils.ExecutorFactory;
+import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.TypedProperties;
+import org.jboss.messaging.utils.UUIDGenerator;
+
+/**
+ * A ClusterQueueStateManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ClusterQueueStateManagerImpl implements ClusterQueueStateManager, NotificationListener
+{
+   public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
+
+   private final Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
+
+   private final PostOffice postOffice;
+
+   private final StorageManager storageManager;
+
+   private final ExecutorFactory redistributorExecutorFactory;
+
+   private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+   private final ManagementService managementService;
+
+   private boolean started;
+
+   public ClusterQueueStateManagerImpl(final ExecutorFactory redistributorExecutorFactory,
+                                       final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                                       final PostOffice postOffice,
+                                       final StorageManager storageManager,
+                                       final ManagementService managementService)
+   {
+      this.redistributorExecutorFactory = redistributorExecutorFactory;
+
+      this.addressSettingsRepository = addressSettingsRepository;
+
+      this.postOffice = postOffice;
+
+      this.storageManager = storageManager;
+
+      this.managementService = managementService;
+   }
+
+   public synchronized void start()
+   {
+      if (started)
+      {
+         return;
+      }
+
+      managementService.addNotificationListener(this);
+
+      started = true;
+   }
+
+   public synchronized void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      managementService.removeNotificationListener(this);
+
+      started = false;
+   }
+
+   public synchronized boolean isStarted()
+   {
+      return started;
+   }
+
+   public Object getNotificationLock()
+   {
+      return this;
+   }
+
+   public synchronized void onNotification(final Notification notification)
+   {
+      NotificationType type = notification.getType();
+
+      switch (type)
+      {
+         case BINDING_ADDED:
+         {
+            TypedProperties props = notification.getProperties();
+
+            Integer bindingType = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_TYPE);
+
+            if (bindingType == null)
+            {
+               throw new IllegalArgumentException("Binding type not specified");
+            }
+
+            if (bindingType == BindingType.DIVERT_INDEX)
+            {
+               // We don't propagate diverts
+               return;
+            }
+
+            SimpleString routingName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+            SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+            SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
+
+            Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+
+            SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+            Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
+
+            QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
+
+            queueInfos.put(clusterName, info);
+
+            break;
+         }
+         case BINDING_REMOVED:
+         {
+            TypedProperties props = notification.getProperties();
+
+            SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+            if (clusterName == null)
+            {
+               throw new IllegalStateException("No cluster name");
+            }
+
+            QueueInfo info = queueInfos.remove(clusterName);
+
+            if (info == null)
+            {
+               throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+            }
+
+            break;
+         }
+         case CONSUMER_CREATED:
+         {
+            TypedProperties props = notification.getProperties();
+
+            SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+            if (clusterName == null)
+            {
+               throw new IllegalStateException("No cluster name");
+            }
+
+            SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+            QueueInfo info = queueInfos.get(clusterName);
+
+            if (info == null)
+            {
+               throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+            }
+
+            info.incrementConsumers();
+
+            if (filterString != null)
+            {
+               List<SimpleString> filterStrings = info.getFilterStrings();
+
+               if (filterStrings == null)
+               {
+                  filterStrings = new ArrayList<SimpleString>();
+
+                  info.setFilterStrings(filterStrings);
+               }
+
+               filterStrings.add(filterString);
+            }
+
+            Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
+
+            if (distance == null)
+            {
+               throw new IllegalStateException("No distance");
+            }
+
+            if (distance > 0)
+            {
+               SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+               if (queueName == null)
+               {
+                  throw new IllegalStateException("No queue name");
+               }
+
+               Binding binding = postOffice.getBinding(queueName);
+
+               if (binding != null)
+               {
+                  // We have a local queue
+                  Queue queue = (Queue)binding.getBindable();
+
+                  AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
+
+                  long redistributionDelay = addressSettings.getRedistributionDelay();
+
+                  if (redistributionDelay != -1)
+                  {
+                     queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+                  }
+               }
+            }
+
+            break;
+         }
+         case CONSUMER_CLOSED:
+         {
+            TypedProperties props = notification.getProperties();
+
+            SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
+
+            if (clusterName == null)
+            {
+               throw new IllegalStateException("No distance");
+            }
+
+            SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+            QueueInfo info = queueInfos.get(clusterName);
+
+            if (info == null)
+            {
+               throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
+            }
+
+            info.decrementConsumers();
+
+            if (filterString != null)
+            {
+               List<SimpleString> filterStrings = info.getFilterStrings();
+
+               filterStrings.remove(filterString);
+            }
+
+            if (info.getNumberOfConsumers() == 0)
+            {
+               Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
+
+               if (distance == null)
+               {
+                  throw new IllegalStateException("No cluster name");
+               }
+
+               if (distance == 0)
+               {
+                  SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+                  if (queueName == null)
+                  {
+                     throw new IllegalStateException("No queue name");
+                  }
+
+                  Binding binding = postOffice.getBinding(queueName);
+
+                  if (binding == null)
+                  {
+                     throw new IllegalStateException("No queue " + queueName);
+                  }
+
+                  Queue queue = (Queue)binding.getBindable();
+
+                  AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress().toString());
+
+                  long redistributionDelay = addressSettings.getRedistributionDelay();
+
+                  if (redistributionDelay != -1)
+                  {
+                     queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+                  }
+               }
+            }
+
+            break;
+         }
+         case SECURITY_AUTHENTICATION_VIOLATION:
+         case SECURITY_PERMISSION_VIOLATION:
+            break;
+         default:
+         {
+            throw new IllegalArgumentException("Invalid type " + type);
+         }
+
+      }
+   }
+
+   public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
+   {
+      // We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
+      // this is crucial for ensuring
+      // that queue infos and notifications are received in a contiguous consistent stream
+      Binding binding = postOffice.getBinding(queueName);
+
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find queue " + queueName);
+      }
+
+      Queue queue = (Queue)binding.getBindable();
+
+      // Need to lock to make sure all queue info and notifications are in the correct order with no gaps
+      synchronized (this)
+      {
+         // First send a reset message
+
+         ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+         message.setBody(ChannelBuffers.EMPTY_BUFFER);
+         message.setDestination(queueName);
+         message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
+         queue.preroute(message, null);
+         queue.route(message, null);
+
+         for (QueueInfo info : queueInfos.values())
+         {
+            if (info.getAddress().startsWith(address))
+            {
+               message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
+
+               message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+               message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
+               message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
+               message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+               message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
+               message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
+
+               routeDirect(queue, message);
+
+               int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+
+               for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+               {
+                  message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+                  message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+                  message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
+                  message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
+                  message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
+
+                  routeDirect(queue, message);
+               }
+
+               if (info.getFilterStrings() != null)
+               {
+                  for (SimpleString filterString : info.getFilterStrings())
+                  {
+                     message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+                     message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+                     message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
+                     message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
+                     message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+                     message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
+
+                     routeDirect(queue, message);
+                  }
+               }
+            }
+         }
+      }
+
+   }
+
+   private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
+   {
+      if (queue.getFilter() == null || queue.getFilter().match(message))
+      {
+         queue.preroute(message, null);
+         queue.route(message, null);
+      }
+   }
+
+   private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
+   {
+      ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+      message.setBody(ChannelBuffers.EMPTY_BUFFER);
+
+      message.setDestination(queueName);
+
+      String uid = UUIDGenerator.getInstance().generateStringUUID();
+
+      message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
+      message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+      message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
+
+      return message;
+   }
+}




More information about the jboss-cvs-commits mailing list