[hornetq-commits] JBoss hornetq SVN: r9378 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jul 1 14:00:36 EDT 2010


Author: timfox
Date: 2010-07-01 14:00:35 -0400 (Thu, 01 Jul 2010)
New Revision: 9378

Added:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
Log:
HA improvements

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java	2010-07-01 18:00:35 UTC (rev 9378)
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(SubscribeClusterTopologyUpdatesMessage.class);
+
+   // Attributes ----------------------------------------------------
+
+   private boolean clusterConnection;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SubscribeClusterTopologyUpdatesMessage(final boolean clusterConnection)
+   {
+      super(PacketImpl.SUBSCRIBE_TOPOLOGY);
+
+      this.clusterConnection = clusterConnection;
+   }
+
+   public SubscribeClusterTopologyUpdatesMessage()
+   {
+      super(PacketImpl.SUBSCRIBE_TOPOLOGY);
+   }
+
+   // Public --------------------------------------------------------
+
+   public boolean isClusterConnection()
+   {
+      return clusterConnection;
+   }
+   
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeBoolean(clusterConnection);      
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      clusterConnection = buffer.readBoolean();
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	                        (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2010-07-01 18:00:35 UTC (rev 9378)
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.ManagementHelper;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.Transformer;
+import org.hornetq.utils.UUID;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A ClusterConnectionBridge
+ *
+ * @author tim
+ *
+ *
+ */
+public class ClusterConnectionBridge extends BridgeImpl
+{
+   private static final Logger log = Logger.getLogger(ClusterConnectionBridge.class);
+
+   private final MessageFlowRecord flowRecord;
+
+   private final SimpleString managementAddress;
+
+   private final SimpleString managementNotificationAddress;
+
+   private ClientConsumer notifConsumer;
+
+   private final SimpleString idsHeaderName;
+   
+   private final TransportConfiguration connector;
+
+   public ClusterConnectionBridge(final ServerLocator serverLocator,
+                                  final UUID nodeUUID,
+                                  final SimpleString name,
+                                  final Queue queue,
+                                  final Executor executor,
+                                  final SimpleString filterString,
+                                  final SimpleString forwardingAddress,
+                                  final ScheduledExecutorService scheduledExecutor,
+                                  final Transformer transformer,
+                                  final boolean useDuplicateDetection,
+                                  final String user,
+                                  final String password,
+                                  final boolean activated,
+                                  final StorageManager storageManager,
+                                  final SimpleString managementAddress,
+                                  final SimpleString managementNotificationAddress,
+                                  final MessageFlowRecord flowRecord,
+                                  final TransportConfiguration connector) throws Exception
+   {
+      super(serverLocator,
+            nodeUUID,
+            name,
+            queue,
+            executor,
+            filterString,
+            forwardingAddress,
+            scheduledExecutor,
+            transformer,
+            useDuplicateDetection,
+            user,
+            password,
+            activated,
+            storageManager);
+
+      idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+
+      this.managementAddress = managementAddress;
+      this.managementNotificationAddress = managementNotificationAddress;
+      this.flowRecord = flowRecord;
+      this.connector = connector;
+   }
+
+   @Override
+   protected ServerMessage beforeForward(ServerMessage message)
+   {
+      // We make a copy of the message, then we strip out the unwanted routing id headers and leave
+      // only
+      // the one pertinent for the address node - this is important since different queues on different
+      // nodes could have same queue ids
+      // Note we must copy since same message may get routed to other nodes which require different headers
+      message = message.copy();
+
+      // TODO - we can optimise this
+
+      Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+
+      byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+      for (SimpleString propName : propNames)
+      {
+         if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+         {
+            message.removeProperty(propName);
+         }
+      }
+
+      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+
+      message = super.beforeForward(message);
+
+      return message;
+   }
+
+   private void setupNotificationConsumer() throws Exception
+   {
+      if (flowRecord != null)
+      {
+         flowRecord.reset();
+
+         if (notifConsumer != null)
+         {
+            try
+            {
+               notifConsumer.close();
+
+               notifConsumer = null;
+            }
+            catch (HornetQException e)
+            {
+               log.error("Failed to close consumer", e);
+            }
+         }
+
+         // Get the queue data
+
+         String qName = "notif." + UUIDGenerator.getInstance().generateStringUUID();
+
+         SimpleString notifQueueName = new SimpleString(qName);
+
+         SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
+                                                BindingType.DIVERT.toInt() +
+                                                " AND " +
+                                                ManagementHelper.HDR_NOTIFICATION_TYPE +
+                                                " IN ('" +
+                                                NotificationType.BINDING_ADDED +
+                                                "','" +
+                                                NotificationType.BINDING_REMOVED +
+                                                "','" +
+                                                NotificationType.CONSUMER_CREATED +
+                                                "','" +
+                                                NotificationType.CONSUMER_CLOSED +
+                                                "','" +
+                                                NotificationType.PROPOSAL +
+                                                "','" +
+                                                NotificationType.PROPOSAL_RESPONSE +
+                                                "') AND " +
+                                                ManagementHelper.HDR_DISTANCE +
+                                                "<" +
+                                                flowRecord.getMaxHops() +
+                                                " AND (" +
+                                                ManagementHelper.HDR_ADDRESS +
+                                                " LIKE '" +
+                                                flowRecord.getAddress() +
+                                                "%')");
+
+         session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
+
+         notifConsumer = session.createConsumer(notifQueueName);
+
+         notifConsumer.setMessageHandler(flowRecord);
+
+         session.start();
+
+         ClientMessage message = session.createMessage(false);
+
+         ManagementHelper.putOperationInvocation(message,
+                                                 ResourceNames.CORE_SERVER,
+                                                 "sendQueueInfoToQueue",
+                                                 notifQueueName.toString(),
+                                                 flowRecord.getAddress());
+
+         ClientProducer prod = session.createProducer(managementAddress);
+
+         prod.send(message);
+      }
+   }
+
+   @Override
+   protected void afterConnect() throws Exception
+   {
+      setupNotificationConsumer();
+   }
+   
+   @Override
+   protected ClientSessionFactory createSessionFactory() throws Exception
+   {
+      //We create the session factory using the specified connector
+      
+      return serverLocator.createSessionFactory(connector);      
+   }
+
+}



More information about the hornetq-commits mailing list