[jboss-cvs] JBoss Messaging SVN: r2797 - in trunk/src/main/org/jboss/messaging/core/impl: clusterconnection and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 25 18:26:36 EDT 2007
Author: timfox
Date: 2007-06-25 18:26:36 -0400 (Mon, 25 Jun 2007)
New Revision: 2797
Added:
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/JoinClusterRequest.java
Log:
Missing files
Added: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2007-06-25 22:26:36 UTC (rev 2797)
@@ -0,0 +1,623 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.impl.clusterconnection;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Binding;
+import org.jboss.messaging.core.contract.ClusterNotification;
+import org.jboss.messaging.core.contract.ClusterNotificationListener;
+import org.jboss.messaging.core.contract.PostOffice;
+import org.jboss.messaging.core.contract.Queue;
+import org.jboss.messaging.core.contract.Replicator;
+
+/**
+ *
+ * This class handles connections to other nodes that are used to pull messages from remote queues to local queues
+ *
+ * TODO - Throttling - we should send a stop message if the local queue is getting full
+ *
+ * TODO - clean closing of suckers
+ *
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>20 Jun 2007
+ *
+ * $Id: $
+ *
+ */
+public class ClusterConnectionManager implements ClusterNotificationListener
+{
+ private static final Logger log = Logger.getLogger(ClusterConnectionManager.class);
+
+ private boolean trace = log.isTraceEnabled();
+
+ private Map connections;
+
+ private boolean xa;
+
+ private boolean started;
+
+ private int nodeID;
+
+ private String connectionFactoryUniqueName;
+
+ private Replicator replicator;
+
+ private PostOffice postOffice;
+
+ public ClusterConnectionManager(boolean xa, int nodeID,
+ String connectionFactoryUniqueName)
+ {
+ connections = new HashMap();
+
+ this.xa = xa;
+
+ this.nodeID = nodeID;
+
+ this.connectionFactoryUniqueName = connectionFactoryUniqueName;
+
+ if (trace) { log.trace("Created " + this); }
+ }
+
+ public void injectReplicator(Replicator replicator)
+ {
+ this.replicator = replicator;
+ }
+
+ public void injectPostOffice(PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ if (trace) { log.trace(this + " started"); }
+
+ started = true;
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ Iterator iter = connections.values().iterator();
+
+ while (iter.hasNext())
+ {
+ ConnectionInfo info = (ConnectionInfo)iter.next();
+
+ info.close();
+ }
+
+ connections.clear();
+
+ started = false;
+
+ if (trace) { log.trace(this + " stopped"); }
+ }
+
+ /*
+ * We respond to two types of events -
+ *
+ * 1) Connection factory deployment / undeployment
+ *
+ * We need to know the connection factories for the nodes so we can create the connections
+ *
+ * Which connection factory should we use?
+ *
+ * Clustering uses a "special connection factory" which is deployed on each node
+ * this connection factory has no JNDI bindings - since it is only replicated via the replicator
+ * it is configured in the connection-factories-service.xml file
+ *
+ * 2) Binds or unbinds
+ *
+ * When a bind or unbind (either remote or local) of a durable queue comes in we look to see if we have a local queue of the same
+ * name, then look to see if we have a remote queue of the same name, and if so, create a connection to suck from the remote queue
+ * We need to listen for both remote and local binds since the queue may be deployed on the remote node before the local one
+ * or vice versa, in both cases we need to create the connection
+ *
+ */
+ public synchronized void notify(ClusterNotification notification)
+ {
+ if (replicator == null)
+ {
+ //Non clustered
+
+ return;
+ }
+
+ if (trace) { log.trace(this + " notification received " + notification); }
+
+ try
+ {
+ if (notification.type == ClusterNotification.TYPE_REPLICATOR_PUT && notification.data instanceof String)
+ {
+ String key = (String)notification.data;
+
+ if (key.startsWith(Replicator.CF_PREFIX))
+ {
+ //A connection factory has been deployed
+ //We are only interested in the deployment of our special connection factory
+
+ String uniqueName = key.substring(Replicator.CF_PREFIX.length());
+
+ if (uniqueName.equals(connectionFactoryUniqueName))
+ {
+ log.trace(this + " deployment of ClusterConnectionFactory");
+
+ synchronized (this)
+ {
+ ensureAllConnectionsCreated();
+
+ //Now create any suckers for already deployed remote queues - this copes with the case where the CF
+ //is deployed AFTER the queues are deployed
+
+ createAllSuckers();
+ }
+ }
+ }
+ }
+ else if (notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE && notification.data instanceof String)
+ {
+ String key = (String)notification.data;
+
+ if (key.startsWith(Replicator.CF_PREFIX))
+ {
+ //A connection factory has been undeployed
+ //We are only interested in the undeployment of our special connection factory
+
+ String uniqueName = key.substring(Replicator.CF_PREFIX.length());
+
+ if (uniqueName.equals(connectionFactoryUniqueName))
+ {
+ log.trace(this + " undeployment of ClusterConnectionFactory");
+
+ Map updatedReplicantMap = replicator.get(key);
+
+ List toRemove = new ArrayList();
+
+ Iterator iter = connections.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ Integer nid = (Integer)entry.getKey();
+
+ if (updatedReplicantMap.get(nid) == null)
+ {
+ toRemove.add(nid);
+ }
+ }
+
+ iter = toRemove.iterator();
+
+ while (iter.hasNext())
+ {
+ Integer nid = (Integer)iter.next();
+
+ ConnectionInfo info = (ConnectionInfo)connections.remove(nid);
+
+ info.close();
+ }
+ }
+ }
+ }
+ else if (notification.type == ClusterNotification.TYPE_BIND)
+ {
+ String queueName = (String)notification.data;
+
+ if (trace) { log.trace(this + " bind of queue " + queueName); }
+
+ if (notification.nodeID == this.nodeID)
+ {
+ //Local bind
+
+ if (trace) { log.trace("Local bind"); }
+
+ ensureAllConnectionsCreated();
+
+ //Loook for remote queues
+
+ Collection bindings = postOffice.getAllBindingsForQueueName(queueName);
+
+ Iterator iter = bindings.iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ //This will only create it if it doesn't already exist
+
+ if (binding.queue.getNodeID() != this.nodeID)
+ {
+ createSucker(queueName, binding.queue.getNodeID());
+ }
+ }
+ }
+ else
+ {
+ //Remote bind
+
+ if (trace) { log.trace("Remote bind"); }
+
+ ensureAllConnectionsCreated();
+
+ //Look for local queue
+
+ Binding localBinding = postOffice.getBindingForQueueName(queueName);
+
+ if (localBinding == null)
+ {
+ //This is ok - the queue was deployed on the remote node before being deployed on the local node - do nothing for now
+ }
+ else
+ {
+ //The queue has already been deployed on the local node so create a sucker
+
+ createSucker(queueName, notification.nodeID);
+ }
+ }
+ }
+ else if (notification.type == ClusterNotification.TYPE_UNBIND)
+ {
+ String queueName = (String)notification.data;
+
+ if (notification.nodeID == this.nodeID)
+ {
+ //Local unbind
+
+ //We need to remove any suckers corresponding to remote nodes
+
+ removeAllSuckers(queueName);
+ }
+ else
+ {
+ //Remote unbind
+
+ //We need to remove the sucker corresponding to the remote queue
+
+ removeSucker(queueName, notification.nodeID);
+
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to process notification", e);
+ }
+ }
+
+ public String toString()
+ {
+ return "ClusterConnectionManager:" + System.identityHashCode(this) +
+ " xa: " + xa + " nodeID: " + nodeID + " connectionFactoryName: " + connectionFactoryUniqueName;
+ }
+
+ private void ensureAllConnectionsCreated() throws Exception
+ {
+ Map updatedReplicantMap = replicator.get(Replicator.CF_PREFIX + connectionFactoryUniqueName);
+
+ // Make sure all connections are created
+
+ Iterator iter = updatedReplicantMap.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ Integer nid = (Integer)entry.getKey();
+
+ ClientConnectionFactoryDelegate delegate = (ClientConnectionFactoryDelegate)entry.getValue();
+
+ if (connections.get(nid) == null)
+ {
+ try
+ {
+ ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate));
+
+ log.trace(this + " created connection info " + info);
+
+ connections.put(nid, info);
+
+ info.start();
+
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to start connection info ", e);
+ }
+ }
+ }
+ }
+
+ private void createSucker(String queueName, int nodeID) throws Exception
+ {
+ ConnectionInfo info = (ConnectionInfo)connections.get(new Integer(nodeID));
+
+ if (info == null)
+ {
+ if (trace) { log.trace("Cluster pull connection factory has not yet been deployed on node " + nodeID); }
+
+ return;
+ }
+
+ ConnectionInfo localInfo = (ConnectionInfo)connections.get(new Integer(this.nodeID));
+
+ if (info == null)
+ {
+ if (trace) { log.trace("Cluster pull connection factory has not yet been deployed on local node"); }
+ }
+
+ //Only create if it isn't already there
+
+ if (!info.hasSucker(queueName))
+ {
+ if (trace) { log.trace("Creating Sucker for queue " + queueName + " node " + nodeID); }
+
+ // Need to lookup the local queue
+
+ Binding binding = this.postOffice.getBindingForQueueName(queueName);
+
+ Queue localQueue = binding.queue;
+
+ MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa);
+
+ info.addSucker(sucker);
+
+ sucker.start();
+
+ if (trace) { log.trace("Started it"); }
+ }
+ else
+ {
+ if (trace) { log.trace("Sucker for queue " + queueName + " node " + nodeID + " already exists, not creating it"); }
+ }
+ }
+
+ private void removeSucker(String queueName, int nodeID)
+ {
+ ConnectionInfo info = (ConnectionInfo)connections.get(new Integer(nodeID));
+
+ if (info == null)
+ {
+ //This is OK, the ClusterPullConnectionfactory might never have been deployed
+ return;
+ }
+
+ MessageSucker sucker = info.removeSucker(queueName);
+
+ if (sucker == null)
+ {
+ throw new IllegalStateException("Cannot find sucker to remove " + sucker);
+ }
+
+ sucker.stop();
+ }
+
+ private void removeAllSuckers(String queueName)
+ {
+ Iterator iter = connections.values().iterator();
+
+ while (iter.hasNext())
+ {
+ ConnectionInfo info = (ConnectionInfo)iter.next();
+
+ MessageSucker sucker = info.removeSucker(queueName);
+
+ //Sucker may not exist - this is ok
+
+ if (sucker != null)
+ {
+ sucker.stop();
+ }
+ }
+ }
+
+ private void createAllSuckers() throws Exception
+ {
+ Collection allBindings = postOffice.getAllBindings();
+
+ Iterator iter = allBindings.iterator();
+
+ Map nameMap = new HashMap();
+
+ //This can probably be greatly optimised
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ List queues = (List)nameMap.get(binding.queue.getName());
+
+ if (queues == null)
+ {
+ queues = new ArrayList();
+
+ nameMap.put(binding.queue.getName(), queues);
+ }
+
+ queues.add(binding.queue);
+ }
+
+ iter = nameMap.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ String queueName = (String)entry.getKey();
+
+ List queues = (List)entry.getValue();
+
+ //Find a local queue if any
+
+ Iterator iter2 = queues.iterator();
+
+ Queue localQueue = null;
+
+ while (iter2.hasNext())
+ {
+ Queue queue = (Queue)iter2.next();
+
+ if (queue.getNodeID() == this.nodeID)
+ {
+ localQueue = queue;
+
+ break;
+ }
+ }
+
+ if (localQueue != null)
+ {
+ iter2 = queues.iterator();
+
+ while (iter.hasNext())
+ {
+ Queue queue = (Queue)iter2.next();
+
+ if (queue.getNodeID() != this.nodeID)
+ {
+ //Now we have found a local and remote with matching names - so we can create a sucker
+
+ createSucker(queueName, queue.getNodeID());
+ }
+ }
+ }
+ }
+ }
+
+ class ConnectionInfo
+ {
+ private JBossConnectionFactory connectionFactory;
+
+ private JBossConnection connection;
+
+ private Map suckers;
+
+ private boolean started;
+
+ ConnectionInfo(JBossConnectionFactory connectionFactory) throws Exception
+ {
+ this.connectionFactory = connectionFactory;
+
+ this.suckers = new HashMap();
+ }
+
+ synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ if (connection == null)
+ {
+ connection = (JBossConnection)connectionFactory.createConnection();
+ }
+
+ connection.start();
+
+ started = true;
+ }
+
+ synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ connection.stop();
+
+ started = false;
+ }
+
+ synchronized void close()
+ {
+ Iterator iter = suckers.values().iterator();
+
+ while (iter.hasNext())
+ {
+ MessageSucker sucker = (MessageSucker)iter.next();
+
+ sucker.stop();
+ }
+
+ suckers.clear();
+
+ try
+ {
+ connection.close();
+ }
+ catch (Throwable t)
+ {
+ if (trace) { log.trace("Failure in closing source connection", t); }
+ }
+
+ connection = null;
+
+ started = false;
+ }
+
+ synchronized boolean hasSucker(String queueName)
+ {
+ return suckers.containsKey(queueName);
+ }
+
+ synchronized void addSucker(MessageSucker sucker)
+ {
+ if (suckers.containsKey(sucker.getQueueName()))
+ {
+ throw new IllegalStateException("Already has sucker for queue " + sucker.getQueueName());
+ }
+
+ suckers.put(sucker.getQueueName(), sucker);
+ }
+
+ synchronized MessageSucker removeSucker(String queueName)
+ {
+ MessageSucker sucker = (MessageSucker)suckers.remove(queueName);
+
+ return sucker;
+ }
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-06-25 22:26:36 UTC (rev 2797)
@@ -0,0 +1,294 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.impl.clusterconnection;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossSession;
+import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.ProducerDelegate;
+import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Queue;
+import org.jboss.tm.TransactionManagerLocator;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>20 Jun 2007
+ *
+ * $Id: $
+ *
+ */
+public class MessageSucker implements MessageListener
+{
+ private static final Logger log = Logger.getLogger(MessageSucker.class);
+
+ private boolean trace = log.isTraceEnabled();
+
+ private JBossConnection sourceConnection;
+
+ private JBossConnection localConnection;
+
+ private Queue localQueue;
+
+ private SessionDelegate sourceSession;
+
+ private SessionDelegate localSession;
+
+ private ProducerDelegate producer;
+
+ private boolean started;
+
+ private boolean xa;
+
+ private TransactionManager tm;
+
+ private boolean consuming;
+
+ private ConsumerDelegate consumer;
+
+ MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection, boolean xa)
+ {
+ this.localQueue = localQueue;
+
+ this.sourceConnection = sourceConnection;
+
+ this.localConnection = localConnection;
+
+ this.xa = xa;
+
+ if (xa)
+ {
+ tm = TransactionManagerLocator.getInstance().locate();
+ }
+ }
+
+ synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ if (trace) { log.trace(this + " starting"); }
+
+ //Only support non XA for now
+
+ if (!xa)
+ {
+ //If not XA then we use a client ack session for consuming - this allows us to get the message, send it to the destination
+ //then ack the message.
+ //This means that if a failure occurs between sending and acking the message won't be lost but may get delivered
+ //twice - i.e we have dups_ok behaviour
+
+ JBossSession sess = (JBossSession)sourceConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ sourceSession = (SessionDelegate)sess.getDelegate();
+
+
+ sess = (JBossSession)localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ localSession = (SessionDelegate)sess.getDelegate();
+ }
+ else
+ {
+ JBossSession sess = (JBossSession)sourceConnection.createXASession();
+
+ sourceSession = (SessionDelegate)sess.getDelegate();
+
+ sess = (JBossSession)localConnection.createXASession();
+
+ localSession = (SessionDelegate)sess.getDelegate();
+ }
+
+ JBossDestination dest = new JBossQueue(localQueue.getName(), true);
+
+ producer = localSession.createProducerDelegate(dest);
+
+ //We create the consumer with autoFlowControl = false
+ //In this mode, the consumer does not handle it's own flow control, but it must be handled
+ //manually using changeRate() methods
+ //The local queue itself will manually send these messages depending on its state -
+ //So effectively the message buffering is handled by the local queue, not the ClientConsumer
+ consumer = sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
+
+ consumer.setMessageListener(this);
+
+ //Register ourselves with the local queue - this queue will handle flow control for us
+
+ if (trace) { log.trace(this + " Registering sucker"); }
+
+ localQueue.registerSucker(this);
+
+ if (trace) { log.trace(this + " Registered sucker"); }
+ }
+
+ synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ setConsuming(false);
+
+ //FIXME - need to do the stopping properly
+
+ localQueue.unregisterSucker(this);
+
+ try
+ {
+ sourceSession.close();
+ }
+ catch (Throwable t)
+ {
+ if (trace) { log.trace("Failure in closing source session", t); }
+ }
+
+ try
+ {
+ localSession.close();
+ }
+ catch (Throwable t)
+ {
+ if (trace) { log.trace("Failure in closing local session", t); }
+ }
+ }
+
+ public String getQueueName()
+ {
+ return this.localQueue.getName();
+ }
+
+ public void setConsuming(boolean consume)
+ {
+ if (trace) { log.trace(this + " setConsuming " + consume); }
+
+ try
+ {
+ if (consume && !consuming)
+ {
+ //Send a changeRate(1) message - to start consumption
+
+ consumer.changeRate(1f);
+
+ if (trace) { log.trace(this + " sent changeRate(1) message"); }
+
+ consuming = true;
+ }
+ else if (!consume && consuming)
+ {
+ //Send a changeRate(0) message to stop consumption
+
+ consumer.changeRate(0f);
+
+ if (trace) { log.trace(this + " sent changeRate(0) message"); }
+
+ consuming = false;
+ }
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to change rate", e);
+ }
+ }
+
+ public void onMessage(Message msg)
+ {
+ Transaction tx = null;
+
+ if (trace) { log.trace(this + " sucked message " + msg); }
+
+ try
+ {
+ boolean startTx = xa && msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT;
+
+ if (startTx)
+ {
+ //Start a JTA transaction
+
+ if (trace) { log.trace("Starting JTA transactions"); }
+
+ tm.begin();
+
+ tx = tm.getTransaction();
+
+ tx.enlistResource(sourceSession.getXAResource());
+
+ tx.enlistResource(localSession.getXAResource());
+
+ if (trace) { log.trace("Started JTA transaction"); }
+ }
+
+ producer.send(null, msg, -1, -1, Long.MIN_VALUE);
+
+ if (trace) { log.trace(this + " forwarded message to queue"); }
+
+ if (startTx)
+ {
+
+ if (trace) { log.trace("Committing JTA transaction"); }
+
+ tx.delistResource(sourceSession.getXAResource(), XAResource.TMSUCCESS);
+
+ tx.delistResource(localSession.getXAResource(), XAResource.TMSUCCESS);
+
+ tx.commit();
+
+ if (trace) { log.trace("Committed JTA transaction"); }
+ }
+ else
+ {
+ msg.acknowledge();
+
+ if (trace) { log.trace("Acnowledged message"); }
+ }
+
+ //if (queue.)
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to forward message", e);
+
+ try
+ {
+ if (tx != null) tx.rollback();
+ }
+ catch (Throwable t)
+ {
+ if (trace) { log.trace("Failed to rollback tx", t); }
+ }
+ }
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/postoffice/JoinClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/JoinClusterRequest.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/JoinClusterRequest.java 2007-06-25 22:26:36 UTC (rev 2797)
@@ -0,0 +1,84 @@
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>22 Jun 2007
+ *
+ * $Id: $
+ *
+ */
+class JoinClusterRequest extends ClusterRequest
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private int nodeId;
+
+ private PostOfficeAddressInfo info;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ JoinClusterRequest(int nodeId, PostOfficeAddressInfo info)
+ {
+ this.nodeId = nodeId;
+
+ this.info = info;
+ }
+
+ JoinClusterRequest()
+ {
+ }
+
+ // Streamable implementation --------------------------------------------------------------------
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeInt(nodeId);
+
+ info.write(out);
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ nodeId = in.readInt();
+
+ info = new PostOfficeAddressInfo();
+
+ info.read(in);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public String toString()
+ {
+ return "JoinClusterRequest[info=" + info + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ Object execute(RequestTarget office) throws Throwable
+ {
+ office.handleNodeJoined(nodeId, info);
+
+ return null;
+ }
+
+ byte getType()
+ {
+ return ClusterRequest.JOIN_CLUSTER_REQUEST;
+ }
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list