[jboss-cvs] JBoss Messaging SVN: r4130 - in trunk/src/main/org/jboss/messaging/core: server/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Apr 29 06:16:53 EDT 2008


Author: ataylor
Date: 2008-04-29 06:16:53 -0400 (Tue, 29 Apr 2008)
New Revision: 4130

Modified:
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
added functionality to allow a queue to be pause its delivery, this is needed when rolling back a message to gaurantee it always gets delivered in the correct sequence.

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-04-29 10:16:53 UTC (rev 4130)
@@ -108,4 +108,8 @@
    MessageReference getReference(long id);
    
    void deleteAllReferences(StorageManager storageManager) throws Exception;
+
+   void stopDelivery();
+
+   void startDelivery();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-04-29 10:16:53 UTC (rev 4130)
@@ -58,7 +58,7 @@
 	
 	void setStarted(boolean started) throws Exception;
 	
-	void handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
+	HandleStatus handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
 	
 	void promptDelivery(Queue queue);
 	

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-04-29 10:16:53 UTC (rev 4130)
@@ -97,6 +97,8 @@
    private AtomicInteger deliveringCount = new AtomicInteger(0);
    
    private volatile FlowController flowController;
+
+   private boolean delivering = true; 
    
    public QueueImpl(final long persistenceID, final String name, final Filter filter, final boolean clustered,
                     final boolean durable, final boolean temporary, final int maxSize,
@@ -447,7 +449,18 @@
    		
    	tx.commit();   	
    }
-   
+
+   public void stopDelivery()
+   {
+      delivering = false;
+   }
+
+   public void startDelivery()
+   {
+      delivering = true;
+      deliver();
+   }
+
    // Public -----------------------------------------------------------------------------
 
    public boolean equals(Object other)
@@ -580,7 +593,7 @@
 
    private HandleStatus deliver(final MessageReference reference)
    {
-      if (consumers.isEmpty())
+      if (consumers.isEmpty() || !delivering)
       {
          return HandleStatus.BUSY;
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-04-29 10:16:53 UTC (rev 4130)
@@ -206,16 +206,15 @@
                    
          try
          {
-         	sessionEndpoint.handleDelivery(ref, this);
+         	return sessionEndpoint.handleDelivery(ref, this);
          }
          catch (Exception e)
          {
          	log.error("Failed to handle delivery", e);
          	
          	started = false; // DO NOT return null or the message might get delivered more than once
+            return HandleStatus.BUSY;
          }
-                          
-         return HandleStatus.HANDLED;
       }
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-04-29 10:16:53 UTC (rev 4130)
@@ -56,13 +56,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.Delivery;
-import org.jboss.messaging.core.server.ObjectIDGenerator;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerConnection;
-import org.jboss.messaging.core.server.ServerConsumer;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.*;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -100,33 +94,33 @@
    private final boolean trace = log.isTraceEnabled();
 
    private final long id;
-   
+
    private final boolean autoCommitSends;
 
    private final boolean autoCommitAcks;
-   
+
    private final ServerConnection connection;
-   
+
    private final ResourceManager resourceManager;
 
    private final PacketSender sender;
-   
+
    private final PacketDispatcher dispatcher;
-   
+
    private final StorageManager persistenceManager;
-   
+
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-         
+
    private final PostOffice postOffice;
-   
+
    private final SecurityStore securityStore;
-   
+
    private final ObjectIDGenerator objectIDGenerator;
-         
+
    private final Set<ServerConsumer> consumers = new ConcurrentHashSet<ServerConsumer>();
 
    private final Set<ServerBrowserImpl> browsers = new ConcurrentHashSet<ServerBrowserImpl>();
-   
+
    private final Set<ServerProducer> producers = new ConcurrentHashSet<ServerProducer>();
 
    private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
@@ -137,24 +131,26 @@
 
    private Transaction tx;
 
+   private ArrayList<Queue> stopped = new ArrayList<Queue>();
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final boolean autoCommitSends,
                             final boolean autoCommitAcks,
                             final boolean xa, final ServerConnection connection,
-                            final ResourceManager resourceManager, final PacketSender sender, 
+                            final ResourceManager resourceManager, final PacketSender sender,
                             final PacketDispatcher dispatcher, final StorageManager persistenceManager,
                             final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                             final PostOffice postOffice, final SecurityStore securityStore,
                             final ObjectIDGenerator objectIDGenerator) throws Exception
    {
    	this.id = objectIDGenerator.generateID();
-            
+
       this.autoCommitSends = autoCommitSends;
 
       this.autoCommitAcks = autoCommitAcks;
-      
+
       if (!xa)
       {
          tx = new TransactionImpl(persistenceManager, postOffice);
@@ -163,21 +159,21 @@
       this.connection = connection;
 
       this.resourceManager = resourceManager;
-            
+
       this.sender = sender;
-    
+
       this.dispatcher = dispatcher;
-      
+
       this.persistenceManager = persistenceManager;
-      
+
       this.queueSettingsRepository = queueSettingsRepository;
-      
+
       this.postOffice = postOffice;
-      
+
       this.securityStore = securityStore;
-      
+
       this.objectIDGenerator = objectIDGenerator;
-            
+
       if (log.isTraceEnabled())
       {
          log.trace("created server session endpoint for " + sender.getRemoteAddress());
@@ -186,12 +182,12 @@
 
    // ServerSession implementation
    // ---------------------------------------------------------------------------------------
-   
+
    public long getID()
    {
    	return id;
    }
-   
+
    public ServerConnection getConnection()
    {
       return connection;
@@ -203,8 +199,8 @@
       {
          throw new IllegalStateException("Cannot find browser with id " + browser.getID() + " to remove");
       }
-      
-      dispatcher.unregister(browser.getID());           
+
+      dispatcher.unregister(browser.getID());
    }
 
    public void removeConsumer(final ServerConsumer consumer) throws Exception
@@ -213,29 +209,35 @@
       {
          throw new IllegalStateException("Cannot find consumer with id " + consumer.getID() + " to remove");
       }
-      
-      dispatcher.unregister(consumer.getID());           
+
+      dispatcher.unregister(consumer.getID());
    }
-   
+
    public void removeProducer(final ServerProducer producer) throws Exception
    {
       if (!producers.remove(producer))
       {
          throw new IllegalStateException("Cannot find producer with id " + producer.getID() + " to remove");
       }
-      
-      dispatcher.unregister(producer.getID());           
+
+      dispatcher.unregister(producer.getID());
    }
-   
-   public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
+
+   public synchronized HandleStatus handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
+      //if the queue we are delivering to has been stopped then dont deliver!
+      if (stopped.contains(ref.getQueue()))
+      {
+         return HandleStatus.BUSY;
+      }
       Delivery delivery = new DeliveryImpl(ref, id, consumer.getID(), deliveryIDSequence++, sender);
 
       deliveries.add(delivery);
 
       delivery.deliver();
+      return HandleStatus.HANDLED;
    }
-   
+
    public void setStarted(final boolean s) throws Exception
    {
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers);
@@ -265,7 +267,7 @@
       }
 
       browsers.clear();
-      
+
       Set<ServerProducer> producersClone = new HashSet<ServerProducer>(producers);
 
       for (ServerProducer producer: producersClone)
@@ -274,7 +276,7 @@
       }
 
       producers.clear();
-      
+
       rollback();
 
       executor.shutdown();
@@ -283,7 +285,7 @@
 
       connection.removeSession(this);
    }
-   
+
    public void promptDelivery(final Queue queue)
    {
       // TODO - do we really need to prompt on a different thread?
@@ -295,19 +297,19 @@
          }
       });
    }
-   
+
    public void send(final String address, final Message msg) throws Exception
-   {      
+   {
       //check the user has write access to this address
       securityStore.check(address, CheckType.WRITE, connection);
-      
+
       msg.setMessageID(persistenceManager.generateMessageID());
-      
+
       // This allows the no-local consumers to filter out the messages that come
       // from the same connection.
 
       msg.setConnectionID(connection.getID());
-      
+
       if (autoCommitSends)
       {
       	List<MessageReference> refs = postOffice.route(address, msg);
@@ -316,7 +318,7 @@
    		{
    			persistenceManager.storeMessage(address, msg);
    		}
-   		
+
    		for (MessageReference ref: refs)
    		{
    			ref.getQueue().addLast(ref);
@@ -366,7 +368,7 @@
                else
                {
                	tx.addAcknowledgement(ref);
-               	
+
                   //Del count is not actually updated in storage unless it's cancelled
                   ref.incrementDeliveryCount();
                }
@@ -404,7 +406,7 @@
                else
                {
                   tx.addAcknowledgement(ref);
-                  
+
                   //Del count is not actually updated in storage unless it's cancelled
                   ref.incrementDeliveryCount();
                }
@@ -423,25 +425,40 @@
 
          tx = new TransactionImpl(persistenceManager, postOffice);
       }
-      
-      // Synchronize to prevent any new deliveries arriving during this recovery
+
+      // Synchronize to prevent any new deliveries arriving during this recovery. also we stop any queues that are having
+      // messages rolled back, if their are any messages in mid delivery then these will not be delivered. 
       synchronized (this)
       {
          // Add any unacked deliveries into the tx. Doing this ensures all references are rolled back in the correct
          // order in a single contiguous block
 
          for (Delivery del : deliveries)
-         {         	
+         {
             tx.addAcknowledgement(del.getReference());
          }
-
+         //stop the queue delivering for all the queues where messages are being rolled back
+         List<MessageReference> acks = tx.getAcknowledgements();
+         for (MessageReference ack : acks)
+         {
+            stopped.add(ack.getQueue());
+         }
+         for (Queue queue : stopped)
+         {
+            queue.stopDelivery();
+         }
          deliveries.clear();
-
          deliveryIDSequence -= tx.getAcknowledgementsCount();
       }
+      tx.rollback(queueSettingsRepository);
+      //once we have done the rollbackwe can restart any queues which will flush any awaiting deliveries
+      ArrayList<Queue> toRestart = new ArrayList<Queue>(stopped);
+      stopped.clear();
+      for (Queue queue : toRestart)
+      {
+         queue.startDelivery();
+      }
 
-      tx.rollback(queueSettingsRepository);         
-      
       tx = new TransactionImpl(persistenceManager, postOffice);
    }
 
@@ -499,7 +516,7 @@
    public void commit() throws Exception
    {
       tx.commit();
-      
+
       tx = new TransactionImpl(persistenceManager, postOffice);
    }
 
@@ -790,15 +807,15 @@
    public void addDestination(final String address, final boolean temporary) throws Exception
    {
       securityStore.check(address, CheckType.CREATE, connection);
-      
+
       if (!postOffice.addDestination(address, temporary))
       {
       	throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
       }
       else
-      {      
+      {
          if (temporary)
-         {         
+         {
             connection.addTemporaryDestination(address);
          }
       }
@@ -807,13 +824,13 @@
    public void removeDestination(final String address, final boolean temporary) throws Exception
    {
    	securityStore.check(address, CheckType.CREATE, connection);
-   	
+
       if (!postOffice.removeDestination(address, temporary))
       {
          throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST, "Address does not exist: " + address);
       }
       else
-      {      
+      {
          if (temporary)
          {
          	connection.removeTemporaryDestination(address);
@@ -876,7 +893,7 @@
       }
 
       Queue queue = binding.getQueue();
-      
+
       if (queue.getConsumerCount() != 0)
       {
       	throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
@@ -903,26 +920,26 @@
       {
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
       }
-      
+
       securityStore.check(binding.getAddress(), CheckType.READ, connection);
-      
+
       Filter filter = null;
 
       if (filterString != null)
       {
          filter = new FilterImpl(filterString);
       }
-      
+
       //Flow control values if specified on queue override those passed in from client
-      
+
       Integer queueWindowSize = queueSettingsRepository.getMatch(queueName).getConsumerWindowSize();
-      
+
       windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
-      
+
       Integer queueMaxRate = queueSettingsRepository.getMatch(queueName).getConsumerMaxRate();
-      
+
       maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
-      
+
       ServerConsumer consumer =
       	new ServerConsumerImpl(objectIDGenerator.generateID(), binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
                                 this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
@@ -932,8 +949,8 @@
       SessionCreateConsumerResponseMessage response =
       	new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
 
-      consumers.add(consumer);      
-      
+      consumers.add(consumer);
+
       return response;
    }
 
@@ -994,20 +1011,20 @@
 
    public SessionCreateBrowserResponseMessage createBrowser(final String queueName, final String selector)
          throws Exception
-   {      
+   {
       Binding binding = postOffice.getBinding(queueName);
 
       if (binding == null)
       {
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
       }
-      
+
       securityStore.check(binding.getAddress(), CheckType.READ, connection);
-      
+
       ServerBrowserImpl browser = new ServerBrowserImpl(objectIDGenerator.generateID(), this, binding.getQueue(), selector);
 
       browsers.add(browser);
-      
+
       dispatcher.register(browser.newHandler());
 
       return new SessionCreateBrowserResponseMessage(browser.getID());
@@ -1024,44 +1041,44 @@
     */
    public SessionCreateProducerResponseMessage createProducer(final String address, final int windowSize,
    		                                                     final int maxRate) throws Exception
-   { 	
+   {
    	FlowController flowController = null;
-   	
+
    	final int maxRateToUse = maxRate;
-   	
+
    	if (address != null)
    	{
    		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
    	}
-   	
+
    	ServerProducerImpl producer = new ServerProducerImpl(objectIDGenerator.generateID(), this, address, sender, flowController);
 
    	producers.add(producer);
-   	
+
    	dispatcher.register(new ServerProducerPacketHandler(producer));
-   	
+
    	final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
-   	   	   	
+
    	return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
    }
-   
+
    // Public ---------------------------------------------------------------------------------------------
-   
+
    public String toString()
    {
       return "SessionEndpoint[" + id + "]";
-   }  
-   
+   }
+
    // Private --------------------------------------------------------------------------------------------
-      
+
    private void doAck(final MessageReference ref) throws Exception
    {
    	Message message = ref.getMessage();
 
    	Queue queue = ref.getQueue();
-   	
+
 		if (message.isDurable() && queue.isDurable())
-		{            			
+		{
 			synchronized (message)
 			{
 				message.decrementDurableRefCount();
@@ -1074,11 +1091,11 @@
 				{
 					persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
 				}
-			}            			
-		} 
-		
+			}
+		}
+
 		queue.referenceAcknowledged();
    }
 
-   
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-04-29 10:16:53 UTC (rev 4130)
@@ -28,6 +28,8 @@
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 
+import java.util.List;
+
 /**
  * 
  * A JBoss Messaging internal transaction
@@ -44,7 +46,9 @@
    void rollback(HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
    
    void addMessage(String address, Message message) throws Exception;
-   
+
+   List<MessageReference> getAcknowledgements();
+
    void addAcknowledgement(MessageReference acknowledgement) throws Exception;
    
    int getAcknowledgementsCount();

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-04-29 10:16:53 UTC (rev 4130)
@@ -118,7 +118,12 @@
 		}
 	}
 
-	public void addAcknowledgement(final MessageReference acknowledgement)
+   public List<MessageReference> getAcknowledgements()
+   {
+      return new ArrayList<MessageReference>(acknowledgements);
+   }
+
+   public void addAcknowledgement(final MessageReference acknowledgement)
 			throws Exception
 	{
 		if (state != State.ACTIVE)




More information about the jboss-cvs-commits mailing list