[jboss-cvs] JBoss Messaging SVN: r4130 - in trunk/src/main/org/jboss/messaging/core: server/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Apr 29 06:16:53 EDT 2008
Author: ataylor
Date: 2008-04-29 06:16:53 -0400 (Tue, 29 Apr 2008)
New Revision: 4130
Modified:
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
added functionality to allow a queue to be pause its delivery, this is needed when rolling back a message to gaurantee it always gets delivered in the correct sequence.
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-04-29 10:16:53 UTC (rev 4130)
@@ -108,4 +108,8 @@
MessageReference getReference(long id);
void deleteAllReferences(StorageManager storageManager) throws Exception;
+
+ void stopDelivery();
+
+ void startDelivery();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-04-29 10:16:53 UTC (rev 4130)
@@ -58,7 +58,7 @@
void setStarted(boolean started) throws Exception;
- void handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
+ HandleStatus handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
void promptDelivery(Queue queue);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-04-29 10:16:53 UTC (rev 4130)
@@ -97,6 +97,8 @@
private AtomicInteger deliveringCount = new AtomicInteger(0);
private volatile FlowController flowController;
+
+ private boolean delivering = true;
public QueueImpl(final long persistenceID, final String name, final Filter filter, final boolean clustered,
final boolean durable, final boolean temporary, final int maxSize,
@@ -447,7 +449,18 @@
tx.commit();
}
-
+
+ public void stopDelivery()
+ {
+ delivering = false;
+ }
+
+ public void startDelivery()
+ {
+ delivering = true;
+ deliver();
+ }
+
// Public -----------------------------------------------------------------------------
public boolean equals(Object other)
@@ -580,7 +593,7 @@
private HandleStatus deliver(final MessageReference reference)
{
- if (consumers.isEmpty())
+ if (consumers.isEmpty() || !delivering)
{
return HandleStatus.BUSY;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-04-29 10:16:53 UTC (rev 4130)
@@ -206,16 +206,15 @@
try
{
- sessionEndpoint.handleDelivery(ref, this);
+ return sessionEndpoint.handleDelivery(ref, this);
}
catch (Exception e)
{
log.error("Failed to handle delivery", e);
started = false; // DO NOT return null or the message might get delivered more than once
+ return HandleStatus.BUSY;
}
-
- return HandleStatus.HANDLED;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-04-29 10:16:53 UTC (rev 4130)
@@ -56,13 +56,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.Delivery;
-import org.jboss.messaging.core.server.ObjectIDGenerator;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerConnection;
-import org.jboss.messaging.core.server.ServerConsumer;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
@@ -100,33 +94,33 @@
private final boolean trace = log.isTraceEnabled();
private final long id;
-
+
private final boolean autoCommitSends;
private final boolean autoCommitAcks;
-
+
private final ServerConnection connection;
-
+
private final ResourceManager resourceManager;
private final PacketSender sender;
-
+
private final PacketDispatcher dispatcher;
-
+
private final StorageManager persistenceManager;
-
+
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
+
private final PostOffice postOffice;
-
+
private final SecurityStore securityStore;
-
+
private final ObjectIDGenerator objectIDGenerator;
-
+
private final Set<ServerConsumer> consumers = new ConcurrentHashSet<ServerConsumer>();
private final Set<ServerBrowserImpl> browsers = new ConcurrentHashSet<ServerBrowserImpl>();
-
+
private final Set<ServerProducer> producers = new ConcurrentHashSet<ServerProducer>();
private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
@@ -137,24 +131,26 @@
private Transaction tx;
+ private ArrayList<Queue> stopped = new ArrayList<Queue>();
+
// Constructors
// ---------------------------------------------------------------------------------
public ServerSessionImpl(final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean xa, final ServerConnection connection,
- final ResourceManager resourceManager, final PacketSender sender,
+ final ResourceManager resourceManager, final PacketSender sender,
final PacketDispatcher dispatcher, final StorageManager persistenceManager,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice, final SecurityStore securityStore,
final ObjectIDGenerator objectIDGenerator) throws Exception
{
this.id = objectIDGenerator.generateID();
-
+
this.autoCommitSends = autoCommitSends;
this.autoCommitAcks = autoCommitAcks;
-
+
if (!xa)
{
tx = new TransactionImpl(persistenceManager, postOffice);
@@ -163,21 +159,21 @@
this.connection = connection;
this.resourceManager = resourceManager;
-
+
this.sender = sender;
-
+
this.dispatcher = dispatcher;
-
+
this.persistenceManager = persistenceManager;
-
+
this.queueSettingsRepository = queueSettingsRepository;
-
+
this.postOffice = postOffice;
-
+
this.securityStore = securityStore;
-
+
this.objectIDGenerator = objectIDGenerator;
-
+
if (log.isTraceEnabled())
{
log.trace("created server session endpoint for " + sender.getRemoteAddress());
@@ -186,12 +182,12 @@
// ServerSession implementation
// ---------------------------------------------------------------------------------------
-
+
public long getID()
{
return id;
}
-
+
public ServerConnection getConnection()
{
return connection;
@@ -203,8 +199,8 @@
{
throw new IllegalStateException("Cannot find browser with id " + browser.getID() + " to remove");
}
-
- dispatcher.unregister(browser.getID());
+
+ dispatcher.unregister(browser.getID());
}
public void removeConsumer(final ServerConsumer consumer) throws Exception
@@ -213,29 +209,35 @@
{
throw new IllegalStateException("Cannot find consumer with id " + consumer.getID() + " to remove");
}
-
- dispatcher.unregister(consumer.getID());
+
+ dispatcher.unregister(consumer.getID());
}
-
+
public void removeProducer(final ServerProducer producer) throws Exception
{
if (!producers.remove(producer))
{
throw new IllegalStateException("Cannot find producer with id " + producer.getID() + " to remove");
}
-
- dispatcher.unregister(producer.getID());
+
+ dispatcher.unregister(producer.getID());
}
-
- public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
+
+ public synchronized HandleStatus handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
+ //if the queue we are delivering to has been stopped then dont deliver!
+ if (stopped.contains(ref.getQueue()))
+ {
+ return HandleStatus.BUSY;
+ }
Delivery delivery = new DeliveryImpl(ref, id, consumer.getID(), deliveryIDSequence++, sender);
deliveries.add(delivery);
delivery.deliver();
+ return HandleStatus.HANDLED;
}
-
+
public void setStarted(final boolean s) throws Exception
{
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers);
@@ -265,7 +267,7 @@
}
browsers.clear();
-
+
Set<ServerProducer> producersClone = new HashSet<ServerProducer>(producers);
for (ServerProducer producer: producersClone)
@@ -274,7 +276,7 @@
}
producers.clear();
-
+
rollback();
executor.shutdown();
@@ -283,7 +285,7 @@
connection.removeSession(this);
}
-
+
public void promptDelivery(final Queue queue)
{
// TODO - do we really need to prompt on a different thread?
@@ -295,19 +297,19 @@
}
});
}
-
+
public void send(final String address, final Message msg) throws Exception
- {
+ {
//check the user has write access to this address
securityStore.check(address, CheckType.WRITE, connection);
-
+
msg.setMessageID(persistenceManager.generateMessageID());
-
+
// This allows the no-local consumers to filter out the messages that come
// from the same connection.
msg.setConnectionID(connection.getID());
-
+
if (autoCommitSends)
{
List<MessageReference> refs = postOffice.route(address, msg);
@@ -316,7 +318,7 @@
{
persistenceManager.storeMessage(address, msg);
}
-
+
for (MessageReference ref: refs)
{
ref.getQueue().addLast(ref);
@@ -366,7 +368,7 @@
else
{
tx.addAcknowledgement(ref);
-
+
//Del count is not actually updated in storage unless it's cancelled
ref.incrementDeliveryCount();
}
@@ -404,7 +406,7 @@
else
{
tx.addAcknowledgement(ref);
-
+
//Del count is not actually updated in storage unless it's cancelled
ref.incrementDeliveryCount();
}
@@ -423,25 +425,40 @@
tx = new TransactionImpl(persistenceManager, postOffice);
}
-
- // Synchronize to prevent any new deliveries arriving during this recovery
+
+ // Synchronize to prevent any new deliveries arriving during this recovery. also we stop any queues that are having
+ // messages rolled back, if their are any messages in mid delivery then these will not be delivered.
synchronized (this)
{
// Add any unacked deliveries into the tx. Doing this ensures all references are rolled back in the correct
// order in a single contiguous block
for (Delivery del : deliveries)
- {
+ {
tx.addAcknowledgement(del.getReference());
}
-
+ //stop the queue delivering for all the queues where messages are being rolled back
+ List<MessageReference> acks = tx.getAcknowledgements();
+ for (MessageReference ack : acks)
+ {
+ stopped.add(ack.getQueue());
+ }
+ for (Queue queue : stopped)
+ {
+ queue.stopDelivery();
+ }
deliveries.clear();
-
deliveryIDSequence -= tx.getAcknowledgementsCount();
}
+ tx.rollback(queueSettingsRepository);
+ //once we have done the rollbackwe can restart any queues which will flush any awaiting deliveries
+ ArrayList<Queue> toRestart = new ArrayList<Queue>(stopped);
+ stopped.clear();
+ for (Queue queue : toRestart)
+ {
+ queue.startDelivery();
+ }
- tx.rollback(queueSettingsRepository);
-
tx = new TransactionImpl(persistenceManager, postOffice);
}
@@ -499,7 +516,7 @@
public void commit() throws Exception
{
tx.commit();
-
+
tx = new TransactionImpl(persistenceManager, postOffice);
}
@@ -790,15 +807,15 @@
public void addDestination(final String address, final boolean temporary) throws Exception
{
securityStore.check(address, CheckType.CREATE, connection);
-
+
if (!postOffice.addDestination(address, temporary))
{
throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
}
else
- {
+ {
if (temporary)
- {
+ {
connection.addTemporaryDestination(address);
}
}
@@ -807,13 +824,13 @@
public void removeDestination(final String address, final boolean temporary) throws Exception
{
securityStore.check(address, CheckType.CREATE, connection);
-
+
if (!postOffice.removeDestination(address, temporary))
{
throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST, "Address does not exist: " + address);
}
else
- {
+ {
if (temporary)
{
connection.removeTemporaryDestination(address);
@@ -876,7 +893,7 @@
}
Queue queue = binding.getQueue();
-
+
if (queue.getConsumerCount() != 0)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
@@ -903,26 +920,26 @@
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
-
+
securityStore.check(binding.getAddress(), CheckType.READ, connection);
-
+
Filter filter = null;
if (filterString != null)
{
filter = new FilterImpl(filterString);
}
-
+
//Flow control values if specified on queue override those passed in from client
-
+
Integer queueWindowSize = queueSettingsRepository.getMatch(queueName).getConsumerWindowSize();
-
+
windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
-
+
Integer queueMaxRate = queueSettingsRepository.getMatch(queueName).getConsumerMaxRate();
-
+
maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
-
+
ServerConsumer consumer =
new ServerConsumerImpl(objectIDGenerator.generateID(), binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
@@ -932,8 +949,8 @@
SessionCreateConsumerResponseMessage response =
new SessionCreateConsumerResponseMessage(consumer.getID(), windowSize);
- consumers.add(consumer);
-
+ consumers.add(consumer);
+
return response;
}
@@ -994,20 +1011,20 @@
public SessionCreateBrowserResponseMessage createBrowser(final String queueName, final String selector)
throws Exception
- {
+ {
Binding binding = postOffice.getBinding(queueName);
if (binding == null)
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
-
+
securityStore.check(binding.getAddress(), CheckType.READ, connection);
-
+
ServerBrowserImpl browser = new ServerBrowserImpl(objectIDGenerator.generateID(), this, binding.getQueue(), selector);
browsers.add(browser);
-
+
dispatcher.register(browser.newHandler());
return new SessionCreateBrowserResponseMessage(browser.getID());
@@ -1024,44 +1041,44 @@
*/
public SessionCreateProducerResponseMessage createProducer(final String address, final int windowSize,
final int maxRate) throws Exception
- {
+ {
FlowController flowController = null;
-
+
final int maxRateToUse = maxRate;
-
+
if (address != null)
{
flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
}
-
+
ServerProducerImpl producer = new ServerProducerImpl(objectIDGenerator.generateID(), this, address, sender, flowController);
producers.add(producer);
-
+
dispatcher.register(new ServerProducerPacketHandler(producer));
-
+
final int windowToUse = flowController == null ? -1 : flowController.getInitialTokens(windowSize, producer);
-
+
return new SessionCreateProducerResponseMessage(producer.getID(), windowToUse, maxRateToUse);
}
-
+
// Public ---------------------------------------------------------------------------------------------
-
+
public String toString()
{
return "SessionEndpoint[" + id + "]";
- }
-
+ }
+
// Private --------------------------------------------------------------------------------------------
-
+
private void doAck(final MessageReference ref) throws Exception
{
Message message = ref.getMessage();
Queue queue = ref.getQueue();
-
+
if (message.isDurable() && queue.isDurable())
- {
+ {
synchronized (message)
{
message.decrementDurableRefCount();
@@ -1074,11 +1091,11 @@
{
persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
}
- }
- }
-
+ }
+ }
+
queue.referenceAcknowledged();
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-04-29 10:16:53 UTC (rev 4130)
@@ -28,6 +28,8 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
+import java.util.List;
+
/**
*
* A JBoss Messaging internal transaction
@@ -44,7 +46,9 @@
void rollback(HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
void addMessage(String address, Message message) throws Exception;
-
+
+ List<MessageReference> getAcknowledgements();
+
void addAcknowledgement(MessageReference acknowledgement) throws Exception;
int getAcknowledgementsCount();
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-04-29 09:08:48 UTC (rev 4129)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-04-29 10:16:53 UTC (rev 4130)
@@ -118,7 +118,12 @@
}
}
- public void addAcknowledgement(final MessageReference acknowledgement)
+ public List<MessageReference> getAcknowledgements()
+ {
+ return new ArrayList<MessageReference>(acknowledgements);
+ }
+
+ public void addAcknowledgement(final MessageReference acknowledgement)
throws Exception
{
if (state != State.ACTIVE)
More information about the jboss-cvs-commits
mailing list