From do-not-reply at jboss.org Thu Jul 1 14:00:36 2010 Content-Type: multipart/mixed; boundary="===============7404688427346820355==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r9378 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl and 1 other directory. Date: Thu, 01 Jul 2010 14:00:36 -0400 Message-ID: <201007011800.o61I0aQf022749@svn01.web.mwc.hst.phx2.redhat.com> --===============7404688427346820355== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: timfox Date: 2010-07-01 14:00:35 -0400 (Thu, 01 Jul 2010) New Revision: 9378 Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/i= mpl/wireformat/SubscribeClusterTopologyUpdatesMessage.java branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/= impl/ClusterConnectionBridge.java Log: HA improvements Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/co= re/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/= impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java = (rev 0) +++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/= impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java 2010-07-01 18:0= 0:35 UTC (rev 9378) @@ -0,0 +1,77 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.core.impl.wireformat; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.core.logging.Logger; +import org.hornetq.core.protocol.core.impl.PacketImpl; + +/** + * @author Tim Fox + * + */ +public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl +{ + // Constants ----------------------------------------------------- + + private static final Logger log =3D Logger.getLogger(SubscribeClusterTo= pologyUpdatesMessage.class); + + // Attributes ---------------------------------------------------- + + private boolean clusterConnection; + = + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public SubscribeClusterTopologyUpdatesMessage(final boolean clusterConn= ection) + { + super(PacketImpl.SUBSCRIBE_TOPOLOGY); + + this.clusterConnection =3D clusterConnection; + } + + public SubscribeClusterTopologyUpdatesMessage() + { + super(PacketImpl.SUBSCRIBE_TOPOLOGY); + } + + // Public -------------------------------------------------------- + + public boolean isClusterConnection() + { + return clusterConnection; + } + = + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeBoolean(clusterConnection); = + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + clusterConnection =3D buffer.readBoolean(); + } + + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/clus= ter/impl/ClusterConnectionBridge.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster= /impl/ClusterConnectionBridge.java (rev 0) +++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster= /impl/ClusterConnectionBridge.java 2010-07-01 18:00:35 UTC (rev 9378) @@ -0,0 +1,227 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.server.cluster.impl; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientConsumer; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.api.core.management.ManagementHelper; +import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.ResourceNames; +import org.hornetq.core.logging.Logger; +import org.hornetq.core.message.impl.MessageImpl; +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.postoffice.BindingType; +import org.hornetq.core.server.Queue; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.cluster.MessageFlowRecord; +import org.hornetq.core.server.cluster.Transformer; +import org.hornetq.utils.UUID; +import org.hornetq.utils.UUIDGenerator; + +/** + * A ClusterConnectionBridge + * + * @author tim + * + * + */ +public class ClusterConnectionBridge extends BridgeImpl +{ + private static final Logger log =3D Logger.getLogger(ClusterConnectionB= ridge.class); + + private final MessageFlowRecord flowRecord; + + private final SimpleString managementAddress; + + private final SimpleString managementNotificationAddress; + + private ClientConsumer notifConsumer; + + private final SimpleString idsHeaderName; + = + private final TransportConfiguration connector; + + public ClusterConnectionBridge(final ServerLocator serverLocator, + final UUID nodeUUID, + final SimpleString name, + final Queue queue, + final Executor executor, + final SimpleString filterString, + final SimpleString forwardingAddress, + final ScheduledExecutorService scheduled= Executor, + final Transformer transformer, + final boolean useDuplicateDetection, + final String user, + final String password, + final boolean activated, + final StorageManager storageManager, + final SimpleString managementAddress, + final SimpleString managementNotificatio= nAddress, + final MessageFlowRecord flowRecord, + final TransportConfiguration connector) = throws Exception + { + super(serverLocator, + nodeUUID, + name, + queue, + executor, + filterString, + forwardingAddress, + scheduledExecutor, + transformer, + useDuplicateDetection, + user, + password, + activated, + storageManager); + + idsHeaderName =3D MessageImpl.HDR_ROUTE_TO_IDS.concat(name); + + this.managementAddress =3D managementAddress; + this.managementNotificationAddress =3D managementNotificationAddress; + this.flowRecord =3D flowRecord; + this.connector =3D connector; + } + + @Override + protected ServerMessage beforeForward(ServerMessage message) + { + // We make a copy of the message, then we strip out the unwanted rou= ting id headers and leave + // only + // the one pertinent for the address node - this is important since = different queues on different + // nodes could have same queue ids + // Note we must copy since same message may get routed to other node= s which require different headers + message =3D message.copy(); + + // TODO - we can optimise this + + Set propNames =3D new HashSet(message.ge= tPropertyNames()); + + byte[] queueIds =3D message.getBytesProperty(idsHeaderName); + + for (SimpleString propName : propNames) + { + if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) + { + message.removeProperty(propName); + } + } + + message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds); + + message =3D super.beforeForward(message); + + return message; + } + + private void setupNotificationConsumer() throws Exception + { + if (flowRecord !=3D null) + { + flowRecord.reset(); + + if (notifConsumer !=3D null) + { + try + { + notifConsumer.close(); + + notifConsumer =3D null; + } + catch (HornetQException e) + { + log.error("Failed to close consumer", e); + } + } + + // Get the queue data + + String qName =3D "notif." + UUIDGenerator.getInstance().generateS= tringUUID(); + + SimpleString notifQueueName =3D new SimpleString(qName); + + SimpleString filter =3D new SimpleString(ManagementHelper.HDR_BIN= DING_TYPE + "<>" + + BindingType.DIVERT.toInt()= + + " AND " + + ManagementHelper.HDR_NOTIF= ICATION_TYPE + + " IN ('" + + NotificationType.BINDING_A= DDED + + "','" + + NotificationType.BINDING_R= EMOVED + + "','" + + NotificationType.CONSUMER_= CREATED + + "','" + + NotificationType.CONSUMER_= CLOSED + + "','" + + NotificationType.PROPOSAL + + "','" + + NotificationType.PROPOSAL_= RESPONSE + + "') AND " + + ManagementHelper.HDR_DISTA= NCE + + "<" + + flowRecord.getMaxHops() + + " AND (" + + ManagementHelper.HDR_ADDRE= SS + + " LIKE '" + + flowRecord.getAddress() + + "%')"); + + session.createQueue(managementNotificationAddress, notifQueueName= , filter, false); + + notifConsumer =3D session.createConsumer(notifQueueName); + + notifConsumer.setMessageHandler(flowRecord); + + session.start(); + + ClientMessage message =3D session.createMessage(false); + + ManagementHelper.putOperationInvocation(message, + ResourceNames.CORE_SERVER, + "sendQueueInfoToQueue", + notifQueueName.toString(), + flowRecord.getAddress()); + + ClientProducer prod =3D session.createProducer(managementAddress); + + prod.send(message); + } + } + + @Override + protected void afterConnect() throws Exception + { + setupNotificationConsumer(); + } + = + @Override + protected ClientSessionFactory createSessionFactory() throws Exception + { + //We create the session factory using the specified connector + = + return serverLocator.createSessionFactory(connector); = + } + +} --===============7404688427346820355==--