[jboss-svn-commits] JBL Code SVN: r20946 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/internal/soa/esb/couriers and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Jul 7 14:06:37 EDT 2008
Author: mark.little at jboss.com
Date: 2008-07-07 14:06:37 -0400 (Mon, 07 Jul 2008)
New Revision: 20946
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
Modified:
labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
Log:
http://jira.jboss.com/jira/browse/JBESB-1740
Modified: labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
===================================================================
(Binary files differ)
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2008-07-07 17:22:31 UTC (rev 20945)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/InVMCourier.java 2008-07-07 18:06:37 UTC (rev 20946)
@@ -23,7 +23,10 @@
package org.jboss.internal.soa.esb.couriers;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.tx.InVMXAResource;
import org.jboss.soa.esb.addressing.eprs.InVMEpr;
+import org.jboss.soa.esb.common.TransactionStrategy;
+import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.message.Message;
@@ -89,13 +92,18 @@
}
/**
- * package the ESB message into the queue
+ * package the ESB message into the queue. If this is a transactional interaction
+ * then the deliver will return immediately, but the message may not go into
+ * the queue if the transaction subsequently rolls back. The caller must monitor
+ * the transaction outcome to determine the fate of the message. For example, register
+ * a Synchronization.
*
- * @param message Message - the message to deliverAsync
+ * @param Message - the message to deliverAsync
* @return boolean - the result of the delivery
* @throws CourierException -
* if problems were encountered
*/
+
public boolean deliver(Message message) {
if (!isCourierActive()) {
return false;
@@ -105,44 +113,106 @@
return false;
}
- synchronized (messageQueue) {
+ try
+ {
/*
- * If not pass-by-reference then use a copy of
- * the input message.
+ * Are we operating within the scope of a global transaction?
*/
- if (!passByReference)
+ if (isTransactional())
{
- try
- {
- messageQueue.add(message.copy());
- }
- catch (IOException ex)
- {
- logger.warn("Could not create a copy of message to pass by value: "+ex);
-
- return false;
- }
+ /*
+ * If we are transactional, then hold off on the insertion until the transaction commits.
+ * The XAResource is given the duty of doing the insert in that case.
+ * If it doesn't commit, then the insert won't happen and the client needs to monitor
+ * the transaction outcome.
+ *
+ * We could get away with using a Synchronization rather than an XAResource
+ * since there's no durable component in this case. However, Synchronizations aren't
+ * guaranteed to be called in all circumstances and unless you've read the JTS spec. it'll
+ * make maintaining things that little bit more difficult. Using a Synchronization would
+ * be ok with JBossTS, but not necessarily other implementations. We could end up with
+ * a non-atomic outcome as a result.
+ *
+ * The downside of this is that we could force a 1PC transaction to become 2PC when in
+ * fact there's no real difference to the 1PC participant. But the upside is we get
+ * consistency!
+ */
+
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+
+ txStrategy.enlistResource(new InVMXAResource(this, message, InVMXAResource.operation.INSERT));
+
+ /*
+ * Can't do lockstep wait here because otherwise the transaction may not terminate if this
+ * is the transaction controller thread!
+ */
+
+ return true; // does not mean message got into the queue.
}
else
- messageQueue.add(message);
+ {
+ synchronized (messageQueue) {
+ /*
+ * If not pass-by-reference then use a copy of
+ * the input message.
+ */
+
+ if (!passByReference)
+ {
+ try
+ {
+ messageQueue.add(message.copy());
+ }
+ catch (IOException ex)
+ {
+ logger.warn("Could not create a copy of message to pass by value: "+ex);
+
+ return false;
+ }
+ }
+ else
+ messageQueue.add(message);
- // Notify 1 waiting pickup thread of the delivery...
- messageQueue.notify();
+ // Notify 1 waiting pickup thread of the delivery...
+ messageQueue.notify();
- if (deliveryTimeout > 0) {
- try {
- // Wait on notification from the pickup thread...
- messageQueue.wait(deliveryTimeout);
- } catch (InterruptedException e) {
- logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+ if (deliveryTimeout > 0) {
+ try {
+ // Wait on notification from the pickup thread...
+ messageQueue.wait(deliveryTimeout);
+ } catch (InterruptedException e) {
+ logger.warn("Timeout expired while waiting on message pickup on InVM queue '" + epr.getAddr().getAddress() + "'.", e);
+ }
+ }
}
+
+ return true;
}
}
-
- return true;
+ catch (final Throwable ex)
+ {
+ logger.warn("InVMCourier delivery caught: "+ex);
+
+ return false;
+ }
}
+ /**
+ * Get a mesage from the queue or wait for the specified time in case one
+ * arrives.
+ *
+ * If this is a transactional interaction then the message will be placed back
+ * on the queue if the enclosing transaction rolls back. Note that for performance
+ * reasons it is not guaranteed that the message will go back at the same relative
+ * position.
+ *
+ * @param long the time to wait if the queue is empty.
+ * @return a Message or <code>null</code> if there is nothing on the queue.
+ */
+
+ // see associated test
+
public Message pickup(long millis) {
if (!isCourierActive()) {
return null;
@@ -150,22 +220,49 @@
Message message = null;
- millis = Math.max(millis, 100);
- synchronized (messageQueue) {
- if (messageQueue.isEmpty()) {
- try {
- messageQueue.wait(millis);
- } catch (InterruptedException e) {
- logger.debug("Pickup thread '" + Thread.currentThread().getName() + "' interrupted while waiting on delivery notification or timeout.", e);
+ try
+ {
+ millis = Math.max(millis, 100);
+ synchronized (messageQueue) {
+ if (messageQueue.isEmpty()) {
+ try {
+ messageQueue.wait(millis);
+ } catch (InterruptedException e) {
+ logger.debug("Pickup thread '" + Thread.currentThread().getName() + "' interrupted while waiting on delivery notification or timeout.", e);
+ }
}
+ if (!messageQueue.isEmpty()) {
+ message = messageQueue.remove();
+ }
+
+ // Notify 1 waiting delivery thread of the pickup...
+ messageQueue.notify();
}
- if (!messageQueue.isEmpty()) {
- message = messageQueue.remove();
+
+ if (isTransactional())
+ {
+ /*
+ * Return the message, but don't remove it from the queue until the transaction
+ * commits. If the transaction rolls back then the message may not go back into the
+ * queue at the exact place it was originally: other messages may have been removed
+ * successfully by other threads. Plus, we would have to maintain a before and after
+ * image of the queue. This is more a compensation transaction.
+ */
+
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+ InVMXAResource theResource = new InVMXAResource(this, message, InVMXAResource.operation.REMOVE);
+
+ txStrategy.enlistResource(theResource);
+
+ // now we can deliver the message.
}
-
- // Notify 1 waiting delivery thread of the pickup...
- messageQueue.notify();
}
+ catch (final Throwable ex)
+ {
+ logger.warn("InVMCourier pickup caught: "+ex);
+
+ return null;
+ }
return message;
}
@@ -189,6 +286,109 @@
}
public void reset() {
- messageQueue.clear();
+ try
+ {
+ if (isTransactional())
+ logger.warn("InVMCourier reset called on transactional courier: ignoring reset for the sake of consistency!");
+ else
+ messageQueue.clear();
+ }
+ catch (final Throwable ex)
+ {
+ logger.debug("InVMCourier reset caught: "+ex);
+ }
}
+
+ /**
+ * Used by transactional resource to deliver the message to the queue
+ * in the event of a transaction commit.
+ * Almost the same as normal delivery, except no lockstep wait. Lockstep
+ * does not make sense in the scope of queued transactional delivery.
+ */
+
+ public boolean doDeliver (Message message)
+ {
+ synchronized (messageQueue)
+ {
+ /*
+ * If not pass-by-reference then use a copy of
+ * the input message.
+ */
+
+ if (!passByReference)
+ {
+ try
+ {
+ messageQueue.add(message.copy());
+ }
+ catch (IOException ex)
+ {
+ logger.warn("Could not create a copy of message to pass by value: "+ex);
+
+ return false;
+ }
+ }
+ else
+ messageQueue.add(message);
+
+ // Notify 1 waiting pickup thread of the delivery...
+ messageQueue.notify();
+ }
+
+ return true;
+ }
+
+ /**
+ * Used by transactional resource to place message back on the queue. We remove from the head
+ * but insert to the tail, for speed. If individual messages need to be delivered in some well
+ * defined order then this could cause problems. However, there are no ordering guarantees with
+ * a range of transports, and particularly not in an asynchronous environment with arbitrary
+ * network latency.
+ *
+ * If this does cause a problem then we should revisit. It will mean creating a fully transactional
+ * queue (done already in Arjuna so we may be able to lift that code). But that will affect
+ * performance. It may be easier to look at solving the ordering problem separately.
+ */
+
+ public boolean doRedeliver (Message message)
+ {
+ synchronized (messageQueue) {
+ messageQueue.add(message);
+ }
+
+ return true;
+ }
+
+ /*
+ * TODO this is used in a number of classes so should be a separate
+ * util routine.
+ */
+
+ private boolean isTransactional() throws CourierException {
+ boolean transactional;
+ try
+ {
+ TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true);
+ Object txHandle = ((txStrategy == null) ? null : txStrategy.getTransaction());
+ boolean isActive = ((txStrategy == null) ? false : txStrategy.isActive());
+
+ transactional = (txHandle != null);
+
+ /*
+ * Make sure the current transaction is still active! If we
+ * have previously slept, then the timeout may be longer than that
+ * associated with the transaction.
+ */
+
+ if (transactional && !isActive)
+ {
+ throw new CourierException("Associated transaction is no longer active!");
+ }
+ }
+ catch (TransactionStrategyException ex)
+ {
+ throw new CourierException(ex);
+ }
+ return transactional;
+ }
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-07-07 17:22:31 UTC (rev 20945)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/SqlTableCourier.java 2008-07-07 18:06:37 UTC (rev 20946)
@@ -126,9 +126,9 @@
boolean transactional = isTransactional();
- Serializable serilaizedMessage;
+ Serializable serializedMessage;
try {
- serilaizedMessage = Util.serialize(message);
+ serializedMessage = Util.serialize(message);
} catch (Exception e) {
throw new CourierTransportException("Unable to serialize ESB Message.", e);
}
@@ -139,7 +139,7 @@
PreparedStatement insertStatement = jdbcFactory.createInsertStatement(connection);
try {
insertStatement.setString(1, msgId);
- insertStatement.setObject(2, serilaizedMessage);
+ insertStatement.setObject(2, serializedMessage);
insertStatement.setString(3, State.Pending.getColumnValue());
insertStatement.setLong(4, System.currentTimeMillis());
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/internal/soa/esb/couriers/tx/InVMXAResource.java 2008-07-07 18:06:37 UTC (rev 20946)
@@ -0,0 +1,197 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.internal.soa.esb.couriers.tx;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.couriers.InVMCourier;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * This XAResource instance controls the InVM queue manipulation under the control
+ * of the transaction manager. Since InVM is inherently volatile, a crash failure
+ * will take the entire queue with it, along with the client and server, we have much
+ * less to worry about here in terms of recovery. Therefore we need to ensure that
+ * the transaction manager does not save this instance to the log, since it won't
+ * make sense upon recovery: the entire queue will have been destroyed.
+ *
+ * This does mean that we could have non-atomic outcomes in the event of a crash where
+ * durable participants (e.g., database) were involved in the same transaction. But
+ * there is nothing we can do about that, without affecting the performance benefits of
+ * InVM. We will call this out explicitly in the documentation and the user needs to
+ * be aware of the possible consequences.
+ *
+ * Could be a Synchronization if we could guarantee that afterCompletion will be called
+ * (where we would do the compensation). But since that's only a 'best effort' we need
+ * to make this an XAResource.
+ */
+
+public class InVMXAResource implements XAResource
+{
+ public enum operation { INSERT, REMOVE };
+
+ public InVMXAResource (InVMCourier courier, Message msg, final operation op)
+ {
+ _theCourier = courier;
+ _msg = msg;
+ _opcode = op;
+ }
+
+ /*
+ * During commit, we deliver the message on to the queue if we were sending a message.
+ * If we were receiving, then this is a no-op since we already changed the queue by
+ * removing the message.
+ *
+ * @see javax.transaction.xa.XAResource#commit(javax.transaction.xa.Xid, boolean)
+ */
+
+ public void commit (Xid xid, boolean onePhase) throws XAException
+ {
+ if (_opcode == operation.INSERT)
+ {
+ boolean problem = false;
+
+ try
+ {
+ problem = _theCourier.doDeliver(_msg);
+ }
+ catch (final Exception ex)
+ {
+ ex.printStackTrace();
+
+ problem = true; // oops!
+ }
+
+ if (problem) // not a lot we can do at this stage
+ {
+ _logger.warn("InVMXAResource failed to commit to the InVM queue!");
+
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+ }
+ }
+
+ public void end (Xid xid, int flags) throws XAException
+ {
+ }
+
+ public void forget (Xid xid) throws XAException
+ {
+ }
+
+ public int getTransactionTimeout () throws XAException
+ {
+ return _timeout;
+ }
+
+ public int prepare (Xid xid) throws XAException
+ {
+ return XAResource.XA_OK;
+ }
+
+ /*
+ * There is nothing to recover.
+ *
+ * @see javax.transaction.xa.XAResource#recover(int)
+ */
+
+ public Xid[] recover (int flag) throws XAException
+ {
+ return null;
+ }
+
+ /*
+ * During rollback we put the message back on the queue (tail) if we were receiving
+ * a message. If we were delivering then there is nothing to do because we did not
+ * update the queue directly, but rely on this instance to do it if the transaction
+ * commits.
+ */
+
+ public void rollback (Xid xid) throws XAException
+ {
+ if (_opcode == operation.REMOVE) // put the message back on the queue
+ {
+ /*
+ * The message goes back on the queue. This may not be the same
+ * location as it had previously, but any attempt to do a truly
+ * transactional queue will affect performance adversely, for relatively
+ * little benefit. In an asynchronous world, applications should not
+ * be written assuming that a queue (or any transport) guarantees ordering.
+ * The ordering (or lack thereof) should be dealt with at the application
+ * level, where possible.
+ *
+ * TODO we could add a queue-insertion policy that allows the developer to override
+ * how the message gets placed back into the queue.
+ */
+
+ boolean problem = false;
+
+ try
+ {
+ problem = _theCourier.doRedeliver(_msg);
+ }
+ catch (final Exception ex)
+ {
+ problem = true;
+ }
+
+ if (problem) // shouldn't get here, but ...
+ {
+ _logger.warn("InVMXAResource could not rollback and put Message on to InVM queue!");
+
+ throw new XAException(XAException.XA_HEURHAZ);
+ }
+ }
+ }
+
+ public boolean setTransactionTimeout (int seconds) throws XAException
+ {
+ _timeout = seconds;
+
+ return true;
+ }
+
+ public void start (Xid xid, int flags) throws XAException
+ {
+ }
+
+ public Message getMessage ()
+ {
+ return _msg;
+ }
+
+ public boolean isSameRM (XAResource xares) throws XAException
+ {
+ return (xares == this);
+ }
+
+ private InVMCourier _theCourier;
+ private Message _msg;
+ private operation _opcode;
+ private int _timeout;
+
+ protected static Logger _logger = Logger.getLogger(InVMXAResource.class);
+}
\ No newline at end of file
More information about the jboss-svn-commits
mailing list