[jboss-cvs] JBoss Messaging SVN: r4261 - in trunk/src/main/org/jboss/messaging/core: transaction and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 21 06:03:17 EDT 2008


Author: ataylor
Date: 2008-05-21 06:03:17 -0400 (Wed, 21 May 2008)
New Revision: 4261

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
aded a flag on transaction so that it can be marked as failed if a send fails. This is so a client sending messages one way will always get a fail on commit.

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-21 09:39:33 UTC (rev 4260)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-05-21 10:03:17 UTC (rev 4261)
@@ -21,21 +21,6 @@
  */
 package org.jboss.messaging.core.server.impl;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -46,24 +31,11 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.Delivery;
-import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.*;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerConnection;
-import org.jboss.messaging.core.server.ServerConsumer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -72,19 +44,27 @@
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.SimpleString;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Session implementation
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> Parts derived from
  *         JBM 1.x ServerSessionImpl by
- *
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @version <tt>$Revision: 3783 $</tt>
- *
- * $Id: ServerSessionImpl.java 3783 2008-02-25 12:15:14Z timfox $
+ *          <p/>
+ *          $Id: ServerSessionImpl.java 3783 2008-02-25 12:15:14Z timfox $
  */
 public class ServerSessionImpl implements ServerSession
 {
@@ -136,7 +116,7 @@
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
    private Transaction tx;
-   
+
    private final Object rollbackCancelLock = new Object();
 
    // Constructors
@@ -150,7 +130,7 @@
                             final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                             final PostOffice postOffice, final SecurityStore securityStore) throws Exception
    {
-   	this.id = id;
+      this.id = id;
 
       this.autoCommitSends = autoCommitSends;
 
@@ -188,7 +168,7 @@
 
    public long getID()
    {
-   	return id;
+      return id;
    }
 
    public ServerConnection getConnection()
@@ -229,16 +209,16 @@
    public void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
       Delivery delivery;
-      
+
       synchronized (rollbackCancelLock)
       {
          long nextID = deliveryIDSequence.getAndIncrement();
-         
+
          delivery = new DeliveryImpl(ref, id, consumer.getClientTargetID(), nextID, sender);
-         
+
          deliveries.add(delivery);
       }
-                 
+
       delivery.deliver();
    }
 
@@ -246,7 +226,7 @@
    {
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers);
 
-      for (ServerConsumer consumer: consumersClone)
+      for (ServerConsumer consumer : consumersClone)
       {
          consumer.setStarted(s);
       }
@@ -256,7 +236,7 @@
    {
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers);
 
-      for (ServerConsumer consumer: consumersClone)
+      for (ServerConsumer consumer : consumersClone)
       {
          consumer.close();
       }
@@ -265,7 +245,7 @@
 
       Set<ServerBrowserImpl> browsersClone = new HashSet<ServerBrowserImpl>(browsers);
 
-      for (ServerBrowserImpl browser: browsersClone)
+      for (ServerBrowserImpl browser : browsersClone)
       {
          browser.close();
       }
@@ -274,7 +254,7 @@
 
       Set<ServerProducer> producersClone = new HashSet<ServerProducer>(producers);
 
-      for (ServerProducer producer: producersClone)
+      for (ServerProducer producer : producersClone)
       {
          producer.close();
       }
@@ -297,8 +277,31 @@
 
    public void send(final ServerMessage msg) throws Exception
    {
-      //check the user has write access to this address
-      securityStore.check(msg.getDestination(), CheckType.WRITE, connection);
+      //check the user has write access to this address. 
+      try
+      {
+         securityStore.check(msg.getDestination(), CheckType.WRITE, connection);
+      }
+      catch (Exception e)
+      {
+         if (!autoCommitSends)
+         {
+            MessagingException messagingException;
+            if (e instanceof MessagingException)
+            {
+               messagingException = (MessagingException) e;
+            }
+            else
+            {
+               messagingException = new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage());
+            }
+            tx.markAsFailed(messagingException);
+         }
+         else
+         {
+            throw e;
+         }
+      }
 
       msg.setMessageID(persistenceManager.generateMessageID());
 
@@ -309,33 +312,33 @@
 
       if (autoCommitSends)
       {
-      	List<MessageReference> refs = postOffice.route(msg);
+         List<MessageReference> refs = postOffice.route(msg);
 
-   		if (msg.getDurableRefCount() != 0)
-   		{
-   			persistenceManager.storeMessage(msg);
-   		}
+         if (msg.getDurableRefCount() != 0)
+         {
+            persistenceManager.storeMessage(msg);
+         }
 
-   		for (MessageReference ref: refs)
-   		{
-   			ref.getQueue().addLast(ref);
-   		}
+         for (MessageReference ref : refs)
+         {
+            ref.getQueue().addLast(ref);
+         }
       }
       else
       {
-      	tx.addMessage(msg);
+         tx.addMessage(msg);
       }
    }
 
    public void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
    {
-   	/*
-       Note that we do not consider it an error if the deliveries cannot be found to be acked.
-   	 This can legitimately occur if a connection/session/consumer is closed
-       from inside a MessageHandlers onMessage method. In this situation the close will cancel any unacked
-       deliveries, but the subsequent call to delivered() will try and ack again and not find the last
-       delivery on the server.
-       */
+      /*
+      Note that we do not consider it an error if the deliveries cannot be found to be acked.
+      This can legitimately occur if a connection/session/consumer is closed
+      from inside a MessageHandlers onMessage method. In this situation the close will cancel any unacked
+      deliveries, but the subsequent call to delivered() will try and ack again and not find the last
+      delivery on the server.
+      */
       if (allUpTo)
       {
          // Ack all deliveries up to and including the specified id
@@ -360,11 +363,11 @@
 
                if (autoCommitAcks)
                {
-               	doAck(ref);
+                  doAck(ref);
                }
                else
                {
-               	tx.addAcknowledgement(ref);
+                  tx.addAcknowledgement(ref);
 
                   //Del count is not actually updated in storage unless it's cancelled
                   ref.incrementDeliveryCount();
@@ -398,7 +401,7 @@
 
                if (autoCommitAcks)
                {
-               	doAck(ref);
+                  doAck(ref);
                }
                else
                {
@@ -433,14 +436,14 @@
          {
             tx.addAcknowledgement(del.getReference());
          }
-        
+
          deliveries.clear();
-         
+
          deliveryIDSequence.addAndGet(-tx.getAcknowledgementsCount());
       }
-      
+
       tx.rollback(queueSettingsRepository);
-      
+
       tx = new TransactionImpl(persistenceManager, postOffice);
    }
 
@@ -497,9 +500,16 @@
 
    public void commit() throws Exception
    {
-      tx.commit();
+      try
+      {
+         tx.commit();
+      }
+      finally
+      {
+         tx = new TransactionImpl(persistenceManager, postOffice);
+      }
 
-      tx = new TransactionImpl(persistenceManager, postOffice);
+
    }
 
    public SessionXAResponseMessage XACommit(final boolean onePhase, final Xid xid) throws Exception
@@ -507,7 +517,7 @@
       if (tx != null)
       {
          final String msg = "Cannot commit, session is currently doing work in a transaction "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
@@ -521,9 +531,12 @@
          return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
       }
 
-      if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
-            XAException.XAER_PROTO,
-            "Cannot commit transaction, it is suspended " + xid); }
+      if (theTx.getState() == Transaction.State.SUSPENDED)
+      {
+         return new SessionXAResponseMessage(true,
+                 XAException.XAER_PROTO,
+                 "Cannot commit transaction, it is suspended " + xid);
+      }
 
       theTx.commit();
 
@@ -563,7 +576,7 @@
          if (theTx == null)
          {
             final String msg = "Cannot find suspended transaction to end "
-                  + xid;
+                    + xid;
 
             return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
          }
@@ -600,8 +613,11 @@
          return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
       }
 
-      if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
-            XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid); }
+      if (theTx.getState() == Transaction.State.SUSPENDED)
+      {
+         return new SessionXAResponseMessage(true,
+                 XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid);
+      }
 
       tx = theTx;
 
@@ -613,7 +629,7 @@
       if (tx != null)
       {
          final String msg = "Cannot commit, session is currently doing work in a transaction "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
@@ -627,9 +643,12 @@
          return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
       }
 
-      if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
-            XAException.XAER_PROTO,
-            "Cannot prepare transaction, it is suspended " + xid); }
+      if (theTx.getState() == Transaction.State.SUSPENDED)
+      {
+         return new SessionXAResponseMessage(true,
+                 XAException.XAER_PROTO,
+                 "Cannot prepare transaction, it is suspended " + xid);
+      }
 
       if (theTx.isEmpty())
       {
@@ -659,7 +678,7 @@
       if (tx != null)
       {
          final String msg = "Cannot resume, session is currently doing work in a transaction "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
@@ -673,9 +692,12 @@
          return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
       }
 
-      if (theTx.getState() != Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
-            XAException.XAER_PROTO,
-            "Cannot resume transaction, it is not suspended " + xid); }
+      if (theTx.getState() != Transaction.State.SUSPENDED)
+      {
+         return new SessionXAResponseMessage(true,
+                 XAException.XAER_PROTO,
+                 "Cannot resume transaction, it is not suspended " + xid);
+      }
 
       tx = theTx;
 
@@ -689,7 +711,7 @@
       if (tx != null)
       {
          final String msg = "Cannot roll back, session is currently doing work in a transaction "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
@@ -703,9 +725,12 @@
          return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
       }
 
-      if (theTx.getState() == Transaction.State.SUSPENDED) { return new SessionXAResponseMessage(true,
-            XAException.XAER_PROTO,
-            "Cannot rollback transaction, it is suspended " + xid); }
+      if (theTx.getState() == Transaction.State.SUSPENDED)
+      {
+         return new SessionXAResponseMessage(true,
+                 XAException.XAER_PROTO,
+                 "Cannot rollback transaction, it is suspended " + xid);
+      }
 
       theTx.rollback(queueSettingsRepository);
 
@@ -726,7 +751,7 @@
       if (tx != null)
       {
          final String msg = "Cannot start, session is already doing work in a transaction "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
@@ -738,7 +763,7 @@
       if (!added)
       {
          final String msg = "Cannot start, there is already a xid "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
       }
@@ -751,7 +776,7 @@
       if (tx == null)
       {
          final String msg = "Cannot suspend, session is not doing work in a transaction "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
@@ -759,7 +784,7 @@
       if (tx.getState() == Transaction.State.SUSPENDED)
       {
          final String msg = "Cannot suspend, transaction is already suspended "
-               + tx.getXid();
+                 + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
@@ -792,7 +817,7 @@
 
       if (!postOffice.addDestination(address, temporary))
       {
-      	throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
+         throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
       }
       else
       {
@@ -805,7 +830,7 @@
 
    public void removeDestination(final SimpleString address, final boolean temporary) throws Exception
    {
-   	securityStore.check(address, CheckType.CREATE, connection);
+      securityStore.check(address, CheckType.CREATE, connection);
 
       if (!postOffice.removeDestination(address, temporary))
       {
@@ -815,20 +840,20 @@
       {
          if (temporary)
          {
-         	connection.removeTemporaryDestination(address);
+            connection.removeTemporaryDestination(address);
          }
       }
    }
 
    public void createQueue(final SimpleString address, final SimpleString queueName,
-         final SimpleString filterString, boolean durable, final boolean temporary) throws Exception
+                           final SimpleString filterString, boolean durable, final boolean temporary) throws Exception
    {
       //make sure the user has privileges to create this address
       if (!postOffice.containsDestination(address))
       {
          try
          {
-         	securityStore.check(address, CheckType.CREATE, connection);
+            securityStore.check(address, CheckType.CREATE, connection);
          }
          catch (MessagingException e)
          {
@@ -855,7 +880,7 @@
       }
 
       binding = postOffice.addBinding(address, queueName, filter, durable,
-            temporary);
+              temporary);
 
       if (temporary)
       {
@@ -878,12 +903,12 @@
 
       if (queue.getConsumerCount() != 0)
       {
-      	throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
+         throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
       }
 
       if (queue.isDurable())
       {
-      	binding.getQueue().deleteAllReferences(persistenceManager);
+         binding.getQueue().deleteAllReferences(persistenceManager);
       }
 
       if (queue.isTemporary())
@@ -923,15 +948,15 @@
       maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
 
       long id = dispatcher.generateID();
-      
+
       ServerConsumer consumer =
-      	new ServerConsumerImpl(id, clientTargetID, binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
-                                this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
+              new ServerConsumerImpl(id, clientTargetID, binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
+                      this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
 
       dispatcher.register(new ServerConsumerPacketHandler(consumer));
 
       SessionCreateConsumerResponseMessage response =
-      	new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
+              new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
 
       consumers.add(consumer);
 
@@ -958,8 +983,8 @@
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
          response = new SessionQueueQueryResponseMessage(queue.isDurable(), queue.isTemporary(), queue.getMaxSize(),
-                                           queue.getConsumerCount(), queue.getMessageCount(),
-                                           filterString, binding.getAddress());
+                 queue.getConsumerCount(), queue.getMessageCount(),
+                 filterString, binding.getAddress());
       }
       else
       {
@@ -984,7 +1009,7 @@
       {
          List<Binding> bindings = postOffice.getBindingsForAddress(request.getAddress());
 
-         for (Binding binding: bindings)
+         for (Binding binding : bindings)
          {
             queueNames.add(binding.getQueue().getName());
          }
@@ -994,7 +1019,7 @@
    }
 
    public SessionCreateBrowserResponseMessage createBrowser(final SimpleString queueName, final SimpleString filterString)
-         throws Exception
+           throws Exception
    {
       Binding binding = postOffice.getBinding(queueName);
 
@@ -1006,7 +1031,7 @@
       securityStore.check(binding.getAddress(), CheckType.READ, connection);
 
       long id = dispatcher.generateID();
-      
+
       ServerBrowserImpl browser = new ServerBrowserImpl(id, this, binding.getQueue(), filterString == null ? null : filterString.toString());
 
       browsers.add(browser);
@@ -1015,41 +1040,42 @@
 
       return new SessionCreateBrowserResponseMessage(browser.getID());
    }
-   
+
    /**
     * Create a producer for the specified address
-    * @param address The address to produce too
+    *
+    * @param address    The address to produce too
     * @param windowSize - the producer window size to use for flow control.
-    * Specify -1 to disable flow control completely
-    * The actual window size used may be less than the specified window size if the queue's maxSize attribute
-    * is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
-    * specified on the queue
+    *                   Specify -1 to disable flow control completely
+    *                   The actual window size used may be less than the specified window size if the queue's maxSize attribute
+    *                   is set and there are not sufficient empty spaces in the queue, or it is overridden by any producer-window_size
+    *                   specified on the queue
     */
    public SessionCreateProducerResponseMessage createProducer(final long clientTargetID, final SimpleString address, final int windowSize,
-   		                                                     final int maxRate) throws Exception
+                                                              final int maxRate) throws Exception
    {
-   	FlowController flowController = null;
+      FlowController flowController = null;
 
-   	final int maxRateToUse = maxRate;
+      final int maxRateToUse = maxRate;
 
-   	// TODO Flow control disabled for now
-   	
+      // TODO Flow control disabled for now
+
 //   	if (address != null)
 //   	{
 //   		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
 //   	}
-   	
-   	long id = dispatcher.generateID();
 
-   	ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
+      long id = dispatcher.generateID();
 
-   	producers.add(producer);
+      ServerProducerImpl producer = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController);
 
-   	dispatcher.register(new ServerProducerPacketHandler(producer));
+      producers.add(producer);
 
-   	final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
+      dispatcher.register(new ServerProducerPacketHandler(producer));
 
-   	return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
+      final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
+
+      return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
    }
 
    // Public ---------------------------------------------------------------------------------------------
@@ -1063,25 +1089,25 @@
 
    private void doAck(final MessageReference ref) throws Exception
    {
-   	ServerMessage message = ref.getMessage();
+      ServerMessage message = ref.getMessage();
 
-   	Queue queue = ref.getQueue();
+      Queue queue = ref.getQueue();
 
-		if (message.isDurable() && queue.isDurable())
-		{
-			int count = message.decrementDurableRefCount();
+      if (message.isDurable() && queue.isDurable())
+      {
+         int count = message.decrementDurableRefCount();
 
-			if (count == 0)
-			{
-				persistenceManager.storeDelete(message.getMessageID());
-			}
-			else
-			{
-				persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
-			}			
-		}
+         if (count == 0)
+         {
+            persistenceManager.storeDelete(message.getMessageID());
+         }
+         else
+         {
+            persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
+         }
+      }
 
-		queue.referenceAcknowledged();
+      queue.referenceAcknowledged();
    }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-05-21 09:39:33 UTC (rev 4260)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-05-21 10:03:17 UTC (rev 4261)
@@ -21,50 +21,51 @@
  */
 package org.jboss.messaging.core.transaction;
 
-import javax.transaction.xa.Xid;
-
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 
+import javax.transaction.xa.Xid;
+
 /**
- * 
  * A JBoss Messaging internal transaction
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public interface Transaction
-{   
-	void prepare() throws Exception;
-	
+{
+   void prepare() throws Exception;
+
    void commit() throws Exception;
-   
+
    void rollback(HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
-   
+
    void addMessage(ServerMessage message) throws Exception;
 
    void addAcknowledgement(MessageReference acknowledgement) throws Exception;
-   
+
    int getAcknowledgementsCount();
-   
+
    long getID();
-   
+
    Xid getXid();
-   
+
    boolean isEmpty();
-   
+
    void suspend();
-   
+
    void resume();
-     
+
    State getState();
-   
+
    boolean isContainsPersistent();
-   
+
+   void markAsFailed(MessagingException messagingException);
+
    static enum State
    {
-   	ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED;
+      ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED;
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-05-21 09:39:33 UTC (rev 4260)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-05-21 10:03:17 UTC (rev 4261)
@@ -21,14 +21,7 @@
  */
 package org.jboss.messaging.core.transaction.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.transaction.xa.Xid;
-
+import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -39,310 +32,325 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 
+import javax.transaction.xa.Xid;
+import java.util.*;
+
 /**
- * 
  * A TransactionImpl
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
  */
 public class TransactionImpl implements Transaction
 {
-	private static final Logger log = Logger.getLogger(TransactionImpl.class);
+   private static final Logger log = Logger.getLogger(TransactionImpl.class);
 
-	private final StorageManager storageManager;
+   private final StorageManager storageManager;
 
-	private final PostOffice postOffice;
+   private final PostOffice postOffice;
 
-	private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+   private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
 
-	private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
+   private final List<MessageReference> acknowledgements = new ArrayList<MessageReference>();
 
-	private final Xid xid;
+   private final Xid xid;
 
-	private final long id;
+   private final long id;
 
-	private volatile State state = State.ACTIVE;
+   private volatile State state = State.ACTIVE;
 
-	private volatile boolean containsPersistent;
+   private volatile boolean containsPersistent;
 
-	public TransactionImpl(final StorageManager storageManager,
-			final PostOffice postOffice)
-	{
-		this.storageManager = storageManager;
+   private MessagingException messagingException;
 
-		this.postOffice = postOffice;
+   private boolean failed;
 
-		this.xid = null;
+   public TransactionImpl(final StorageManager storageManager,
+                          final PostOffice postOffice)
+   {
+      this.storageManager = storageManager;
 
-		this.id = storageManager.generateTransactionID();
-	}
+      this.postOffice = postOffice;
 
-	public TransactionImpl(final Xid xid, final StorageManager storageManager,
-			final PostOffice postOffice)
-	{
-		this.storageManager = storageManager;
+      this.xid = null;
 
-		this.postOffice = postOffice;
+      this.id = storageManager.generateTransactionID();
+   }
 
-		this.xid = xid;
+   public TransactionImpl(final Xid xid, final StorageManager storageManager,
+                          final PostOffice postOffice)
+   {
+      this.storageManager = storageManager;
 
-		this.id = storageManager.generateTransactionID();
-	}
+      this.postOffice = postOffice;
 
-	// Transaction implementation
-	// -----------------------------------------------------------
+      this.xid = xid;
 
-	public long getID()
-	{
-		return id;
-	}
+      this.id = storageManager.generateTransactionID();
+   }
 
-	public void addMessage(final ServerMessage message) throws Exception
-	{
-		if (state != State.ACTIVE)
-		{
-			throw new IllegalStateException("Transaction is in invalid state " + state);
-		}
+   // Transaction implementation
+   // -----------------------------------------------------------
 
-		List<MessageReference> refs = postOffice.route(message);
+   public long getID()
+   {
+      return id;
+   }
 
-		refsToAdd.addAll(refs);
+   public void addMessage(final ServerMessage message) throws Exception
+   {
+      if (state != State.ACTIVE)
+      {
+         throw new IllegalStateException("Transaction is in invalid state " + state);
+      }
 
-		if (message.getDurableRefCount() != 0)
-		{
-			storageManager.storeMessageTransactional(id, message);
+      List<MessageReference> refs = postOffice.route(message);
 
-			containsPersistent = true;
-		}
-	}
+      refsToAdd.addAll(refs);
 
+      if (message.getDurableRefCount() != 0)
+      {
+         storageManager.storeMessageTransactional(id, message);
+
+         containsPersistent = true;
+      }
+   }
+
    public void addAcknowledgement(final MessageReference acknowledgement)
-			throws Exception
-	{
-		if (state != State.ACTIVE)
-		{
-			throw new IllegalStateException("Transaction is in invalid state " + state);
-		}
-		acknowledgements.add(acknowledgement);
+           throws Exception
+   {
+      if (state != State.ACTIVE)
+      {
+         throw new IllegalStateException("Transaction is in invalid state " + state);
+      }
+      acknowledgements.add(acknowledgement);
 
-		ServerMessage message = acknowledgement.getMessage();
+      ServerMessage message = acknowledgement.getMessage();
 
-		if (message.isDurable())
-		{
-			Queue queue = acknowledgement.getQueue();
+      if (message.isDurable())
+      {
+         Queue queue = acknowledgement.getQueue();
 
-			if (queue.isDurable())
-			{
-				// Need to lock on the message to prevent a race where the ack and
-				// delete
-				// records get recorded in the log in the wrong order
+         if (queue.isDurable())
+         {
+            // Need to lock on the message to prevent a race where the ack and
+            // delete
+            // records get recorded in the log in the wrong order
 
-				// TODO For now - we just use synchronized - can probably do better
-				// locking
+            // TODO For now - we just use synchronized - can probably do better
+            // locking
 
-				synchronized (message)
-				{
-					message.decrementDurableRefCount();
+            synchronized (message)
+            {
+               message.decrementDurableRefCount();
 
-					if (message.getDurableRefCount() == 0)
-					{
-						storageManager.storeDeleteTransactional(id, message
-								.getMessageID());
-					}
-					else
-					{
-						storageManager.storeAcknowledgeTransactional(id, queue
-								.getPersistenceID(), message.getMessageID());
-					}
+               if (message.getDurableRefCount() == 0)
+               {
+                  storageManager.storeDeleteTransactional(id, message
+                          .getMessageID());
+               }
+               else
+               {
+                  storageManager.storeAcknowledgeTransactional(id, queue
+                          .getPersistenceID(), message.getMessageID());
+               }
 
-					containsPersistent = true;
-				}
-			}
-		}
+               containsPersistent = true;
+            }
+         }
+      }
 
-	}
+   }
 
-	public void prepare() throws Exception
-	{
-		if (state != State.ACTIVE)
-		{
-			throw new IllegalStateException("Transaction is in invalid state " + state);
-		}
+   public void prepare() throws Exception
+   {
+      if (state != State.ACTIVE)
+      {
+         throw new IllegalStateException("Transaction is in invalid state " + state);
+      }
 
-		if (xid == null)
-		{
-			throw new IllegalStateException("Cannot prepare non XA transaction");
-		}
+      if (xid == null)
+      {
+         throw new IllegalStateException("Cannot prepare non XA transaction");
+      }
 
-		if (containsPersistent)
-		{
-			storageManager.prepare(id);
-		}
+      if (containsPersistent)
+      {
+         storageManager.prepare(id);
+      }
 
-		state = State.PREPARED;
-	}
+      state = State.PREPARED;
+   }
 
-	public void commit() throws Exception
-	{
-		if (xid != null)
-		{
-			if (state != State.PREPARED)
-			{
-				throw new IllegalStateException("Transaction is in invalid state " + state);
-			}
-		}
-		else
-		{
-			if (state != State.ACTIVE)
-			{
-				throw new IllegalStateException("Transaction is in invalid state " + state);
-			}
-		}
+   public void commit() throws Exception
+   {
+      if (failed)
+      {
+         throw messagingException;
+      }
+      if (xid != null)
+      {
+         if (state != State.PREPARED)
+         {
+            throw new IllegalStateException("Transaction is in invalid state " + state);
+         }
+      }
+      else
+      {
+         if (state != State.ACTIVE)
+         {
+            throw new IllegalStateException("Transaction is in invalid state " + state);
+         }
+      }
 
-		if (containsPersistent)
-		{
-			storageManager.commit(id);
-		}
+      if (containsPersistent)
+      {
+         storageManager.commit(id);
+      }
 
-		for (MessageReference ref : refsToAdd)
-		{
-			ref.getQueue().addLast(ref);
-		}
+      for (MessageReference ref : refsToAdd)
+      {
+         ref.getQueue().addLast(ref);
+      }
 
-		for (MessageReference reference : acknowledgements)
-		{
-			reference.getQueue().referenceAcknowledged();
-		}
+      for (MessageReference reference : acknowledgements)
+      {
+         reference.getQueue().referenceAcknowledged();
+      }
 
-		clear();
+      clear();
 
-		state = State.COMMITTED;
-	}
+      state = State.COMMITTED;
+   }
 
-	public void rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
-	{
-		if (xid != null)
-		{
-			if (state != State.PREPARED && state != State.ACTIVE)
-			{
-				throw new IllegalStateException("Transaction is in invalid state " + state);
-			}
-		}
-		else
-		{
-			if (state != State.ACTIVE)
-			{
-				throw new IllegalStateException("Transaction is in invalid state " + state);
-		   }
-		}
+   public void rollback(final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+   {
+      if (xid != null)
+      {
+         if (state != State.PREPARED && state != State.ACTIVE)
+         {
+            throw new IllegalStateException("Transaction is in invalid state " + state);
+         }
+      }
+      else
+      {
+         if (state != State.ACTIVE)
+         {
+            throw new IllegalStateException("Transaction is in invalid state " + state);
+         }
+      }
 
-		if (containsPersistent)
-		{
-			storageManager.rollback(id);
-		}
+      if (containsPersistent)
+      {
+         storageManager.rollback(id);
+      }
 
-		Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+      Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
 
-		// We sort into lists - one for each queue involved.
-		// Then we cancel back atomicly for each queue adding list on front to
-		// guarantee ordering is preserved
+      // We sort into lists - one for each queue involved.
+      // Then we cancel back atomicly for each queue adding list on front to
+      // guarantee ordering is preserved
 
-		for (MessageReference ref : acknowledgements)
-		{
-			Queue queue = ref.getQueue();
+      for (MessageReference ref : acknowledgements)
+      {
+         Queue queue = ref.getQueue();
 
-			ServerMessage message = ref.getMessage();
+         ServerMessage message = ref.getMessage();
 
-			if (message.isDurable() && queue.isDurable())
-			{
-				// Reverse the decrements we did in the tx
-				message.incrementDurableRefCount();
-			}
+         if (message.isDurable() && queue.isDurable())
+         {
+            // Reverse the decrements we did in the tx
+            message.incrementDurableRefCount();
+         }
 
-			LinkedList<MessageReference> list = queueMap.get(queue);
+         LinkedList<MessageReference> list = queueMap.get(queue);
 
-			if (list == null)
-			{
-				list = new LinkedList<MessageReference>();
+         if (list == null)
+         {
+            list = new LinkedList<MessageReference>();
 
-				queueMap.put(queue, list);
-			}
+            queueMap.put(queue, list);
+         }
 
-			if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
-			{
-				list.add(ref);
-			}
-		}
+         if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+         {
+            list.add(ref);
+         }
+      }
 
-		for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap
-				.entrySet())
-		{
-			LinkedList<MessageReference> refs = entry.getValue();
+      for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap
+              .entrySet())
+      {
+         LinkedList<MessageReference> refs = entry.getValue();
 
-			entry.getKey().addListFirst(refs);
-		}
+         entry.getKey().addListFirst(refs);
+      }
 
-		clear();
+      clear();
 
-		state = State.ROLLEDBACK;
-	}
+      state = State.ROLLEDBACK;
+   }
 
-	public int getAcknowledgementsCount()
-	{
-		return acknowledgements.size();
-	}
+   public int getAcknowledgementsCount()
+   {
+      return acknowledgements.size();
+   }
 
-	public void suspend()
-	{
-		if (state != State.ACTIVE)
-		{
-			throw new IllegalStateException("Can only suspend active transaction");
-		}
-		state = State.SUSPENDED;
-	}
+   public void suspend()
+   {
+      if (state != State.ACTIVE)
+      {
+         throw new IllegalStateException("Can only suspend active transaction");
+      }
+      state = State.SUSPENDED;
+   }
 
-	public void resume()
-	{
-		if (state != State.SUSPENDED)
-		{
-			throw new IllegalStateException("Can only resume a suspended transaction");
-		}
-		state = State.ACTIVE;
-	}
+   public void resume()
+   {
+      if (state != State.SUSPENDED)
+      {
+         throw new IllegalStateException("Can only resume a suspended transaction");
+      }
+      state = State.ACTIVE;
+   }
 
-	public Transaction.State getState()
-	{
-		return state;
-	}
+   public Transaction.State getState()
+   {
+      return state;
+   }
 
-	public Xid getXid()
-	{
-		return xid;
-	}
+   public Xid getXid()
+   {
+      return xid;
+   }
 
-	public boolean isEmpty()
-	{
-		return refsToAdd.isEmpty() && acknowledgements.isEmpty();
-	}
+   public boolean isEmpty()
+   {
+      return refsToAdd.isEmpty() && acknowledgements.isEmpty();
+   }
 
-	public boolean isContainsPersistent()
-	{
-		return containsPersistent;
-	}
+   public boolean isContainsPersistent()
+   {
+      return containsPersistent;
+   }
 
-	public void setContainsPersistent(final boolean containsPersistent)
-	{
-		this.containsPersistent = containsPersistent;
-	}
+   public void markAsFailed(MessagingException messagingException)
+   {
+      this.failed = true;
+      this.messagingException = messagingException;
+   }
 
-	// Private
-	// -------------------------------------------------------------------
+   public void setContainsPersistent(final boolean containsPersistent)
+   {
+      this.containsPersistent = containsPersistent;
+   }
 
-	private void clear()
-	{
-		refsToAdd.clear();
+   // Private
+   // -------------------------------------------------------------------
 
-		acknowledgements.clear();
-	}
+   private void clear()
+   {
+      refsToAdd.clear();
+
+      acknowledgements.clear();
+   }
 }




More information about the jboss-cvs-commits mailing list