From do-not-reply at jboss.org Thu Jul 1 14:00:36 2010
Content-Type: multipart/mixed; boundary="===============7404688427346820355=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r9378 - in
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl
and 1 other directory.
Date: Thu, 01 Jul 2010 14:00:36 -0400
Message-ID: <201007011800.o61I0aQf022749@svn01.web.mwc.hst.phx2.redhat.com>
--===============7404688427346820355==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: timfox
Date: 2010-07-01 14:00:35 -0400 (Thu, 01 Jul 2010)
New Revision: 9378
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/i=
mpl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/=
impl/ClusterConnectionBridge.java
Log:
HA improvements
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/co=
re/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/=
impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java =
(rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/=
impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2010-07-01 18:0=
0:35 UTC (rev 9378)
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author Tim Fox
+ *
+ */
+public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log =3D Logger.getLogger(SubscribeClusterTo=
pologyUpdatesMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private boolean clusterConnection;
+ =
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SubscribeClusterTopologyUpdatesMessage(final boolean clusterConn=
ection)
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY);
+
+ this.clusterConnection =3D clusterConnection;
+ }
+
+ public SubscribeClusterTopologyUpdatesMessage()
+ {
+ super(PacketImpl.SUBSCRIBE_TOPOLOGY);
+ }
+
+ // Public --------------------------------------------------------
+
+ public boolean isClusterConnection()
+ {
+ return clusterConnection;
+ }
+ =
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeBoolean(clusterConnection); =
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ clusterConnection =3D buffer.readBoolean();
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/clus=
ter/impl/ClusterConnectionBridge.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster=
/impl/ClusterConnectionBridge.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster=
/impl/ClusterConnectionBridge.java 2010-07-01 18:00:35 UTC (rev 9378)
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.ManagementHelper;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.Transformer;
+import org.hornetq.utils.UUID;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A ClusterConnectionBridge
+ *
+ * @author tim
+ *
+ *
+ */
+public class ClusterConnectionBridge extends BridgeImpl
+{
+ private static final Logger log =3D Logger.getLogger(ClusterConnectionB=
ridge.class);
+
+ private final MessageFlowRecord flowRecord;
+
+ private final SimpleString managementAddress;
+
+ private final SimpleString managementNotificationAddress;
+
+ private ClientConsumer notifConsumer;
+
+ private final SimpleString idsHeaderName;
+ =
+ private final TransportConfiguration connector;
+
+ public ClusterConnectionBridge(final ServerLocator serverLocator,
+ final UUID nodeUUID,
+ final SimpleString name,
+ final Queue queue,
+ final Executor executor,
+ final SimpleString filterString,
+ final SimpleString forwardingAddress,
+ final ScheduledExecutorService scheduled=
Executor,
+ final Transformer transformer,
+ final boolean useDuplicateDetection,
+ final String user,
+ final String password,
+ final boolean activated,
+ final StorageManager storageManager,
+ final SimpleString managementAddress,
+ final SimpleString managementNotificatio=
nAddress,
+ final MessageFlowRecord flowRecord,
+ final TransportConfiguration connector) =
throws Exception
+ {
+ super(serverLocator,
+ nodeUUID,
+ name,
+ queue,
+ executor,
+ filterString,
+ forwardingAddress,
+ scheduledExecutor,
+ transformer,
+ useDuplicateDetection,
+ user,
+ password,
+ activated,
+ storageManager);
+
+ idsHeaderName =3D MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+
+ this.managementAddress =3D managementAddress;
+ this.managementNotificationAddress =3D managementNotificationAddress;
+ this.flowRecord =3D flowRecord;
+ this.connector =3D connector;
+ }
+
+ @Override
+ protected ServerMessage beforeForward(ServerMessage message)
+ {
+ // We make a copy of the message, then we strip out the unwanted rou=
ting id headers and leave
+ // only
+ // the one pertinent for the address node - this is important since =
different queues on different
+ // nodes could have same queue ids
+ // Note we must copy since same message may get routed to other node=
s which require different headers
+ message =3D message.copy();
+
+ // TODO - we can optimise this
+
+ Set propNames =3D new HashSet(message.ge=
tPropertyNames());
+
+ byte[] queueIds =3D message.getBytesProperty(idsHeaderName);
+
+ for (SimpleString propName : propNames)
+ {
+ if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+ {
+ message.removeProperty(propName);
+ }
+ }
+
+ message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+
+ message =3D super.beforeForward(message);
+
+ return message;
+ }
+
+ private void setupNotificationConsumer() throws Exception
+ {
+ if (flowRecord !=3D null)
+ {
+ flowRecord.reset();
+
+ if (notifConsumer !=3D null)
+ {
+ try
+ {
+ notifConsumer.close();
+
+ notifConsumer =3D null;
+ }
+ catch (HornetQException e)
+ {
+ log.error("Failed to close consumer", e);
+ }
+ }
+
+ // Get the queue data
+
+ String qName =3D "notif." + UUIDGenerator.getInstance().generateS=
tringUUID();
+
+ SimpleString notifQueueName =3D new SimpleString(qName);
+
+ SimpleString filter =3D new SimpleString(ManagementHelper.HDR_BIN=
DING_TYPE + "<>" +
+ BindingType.DIVERT.toInt()=
+
+ " AND " +
+ ManagementHelper.HDR_NOTIF=
ICATION_TYPE +
+ " IN ('" +
+ NotificationType.BINDING_A=
DDED +
+ "','" +
+ NotificationType.BINDING_R=
EMOVED +
+ "','" +
+ NotificationType.CONSUMER_=
CREATED +
+ "','" +
+ NotificationType.CONSUMER_=
CLOSED +
+ "','" +
+ NotificationType.PROPOSAL +
+ "','" +
+ NotificationType.PROPOSAL_=
RESPONSE +
+ "') AND " +
+ ManagementHelper.HDR_DISTA=
NCE +
+ "<" +
+ flowRecord.getMaxHops() +
+ " AND (" +
+ ManagementHelper.HDR_ADDRE=
SS +
+ " LIKE '" +
+ flowRecord.getAddress() +
+ "%')");
+
+ session.createQueue(managementNotificationAddress, notifQueueName=
, filter, false);
+
+ notifConsumer =3D session.createConsumer(notifQueueName);
+
+ notifConsumer.setMessageHandler(flowRecord);
+
+ session.start();
+
+ ClientMessage message =3D session.createMessage(false);
+
+ ManagementHelper.putOperationInvocation(message,
+ ResourceNames.CORE_SERVER,
+ "sendQueueInfoToQueue",
+ notifQueueName.toString(),
+ flowRecord.getAddress());
+
+ ClientProducer prod =3D session.createProducer(managementAddress);
+
+ prod.send(message);
+ }
+ }
+
+ @Override
+ protected void afterConnect() throws Exception
+ {
+ setupNotificationConsumer();
+ }
+ =
+ @Override
+ protected ClientSessionFactory createSessionFactory() throws Exception
+ {
+ //We create the session factory using the specified connector
+ =
+ return serverLocator.createSessionFactory(connector); =
+ }
+
+}
--===============7404688427346820355==--