[jboss-cvs] JBoss Messaging SVN: r1669 - branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 30 19:45:28 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-30 19:45:26 -0500 (Thu, 30 Nov 2006)
New Revision: 1669
Modified:
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-660 - Tim provided by Tim
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-01 00:16:45 UTC (rev 1668)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-01 00:45:26 UTC (rev 1669)
@@ -21,17 +21,16 @@
*/
package org.jboss.jms.server.endpoint;
-
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-
import org.jboss.jms.client.remoting.HandleMessageResponse;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
@@ -59,9 +58,6 @@
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
* Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
* JMS Facade.
@@ -80,7 +76,7 @@
private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
- // Static --------------------------------------------------------
+ // Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -122,9 +118,9 @@
private Object lock;
private Map deliveries;
-
+
private CoreDestination dlq;
-
+
// Constructors --------------------------------------------------
protected ServerConsumerEndpoint(int id, Channel channel,
@@ -152,7 +148,7 @@
sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
this.executor = (QueuedExecutor)pool.get("consumer" + id);
-
+
// Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
// deliveries for the same consumer happen serially, since even if they are queued serially
// the actual deliveries can happen in parallel, resulting in a later one "overtaking" an
@@ -166,9 +162,9 @@
this.noLocal = noLocal;
this.destination = dest;
-
+
this.toDeliver = new ArrayList();
-
+
this.lock = new Object();
if (selector != null)
@@ -178,7 +174,7 @@
if (trace) log.trace("created selector");
}
- //FIXME -
+ //FIXME -
//We really need to get rid of this delivery list - it's only purpose in life is to solve
//the race condition where acks or cancels can come in before handle has returned - and
//that can be solved in a simpler way anyway.
@@ -187,15 +183,15 @@
//and when we do clustering we will have to replicate it too!!
//Let's GET RID OF IT!!!!!!!!!!!
this.deliveries = new LinkedHashMap();
-
+
this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
// adding the consumer to the channel
this.channel.add(this);
-
+
// prompt delivery
channel.deliver(false);
-
+
log.debug(this + " constructed");
}
@@ -207,21 +203,21 @@
public Delivery handle(DeliveryObserver observer, Routable reference, Transaction tx)
{
if (trace) { log.trace(this + " receives " + reference + " for delivery"); }
-
+
// This is ok to have outside lock - is volatile
if (bufferFull)
{
// We buffer a maximum of PREFETCH_LIMIT messages at once
-
+
if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
-
+
return null;
}
-
+
// Need to synchronized around the whole block to prevent setting started = false
// but handle is already running and a message is deposited during the stop procedure.
synchronized (lock)
- {
+ {
// If the consumer is stopped then we don't accept the message, it should go back into the
// channel for delivery later.
if (!started)
@@ -241,40 +237,40 @@
if (trace) { log.trace(this + " has the main lock, preparing the message for delivery"); }
MessageReference ref = (MessageReference)reference;
-
+
JBossMessage message = (JBossMessage)ref.getMessage();
-
+
boolean selectorRejected = !this.accept(message);
-
+
SimpleDelivery delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
-
+
if (selectorRejected)
{
return delivery;
}
-
+
if (delivery.isDone())
{
return delivery;
}
-
- deliveries.put(new Long(ref.getMessageID()), delivery);
-
+
+ deliveries.put(new Long(ref.getMessageID()), delivery);
+
// We don't send the message as-is, instead we create a MessageProxy instance. This allows
// local fields such as deliveryCount to be handled by the proxy but global data to be
// fielded by the same underlying Message instance. This allows us to avoid expensive
// copying of messages
-
+
MessageProxy mp = JBossMessage.createThinDelegate(message, ref.getDeliveryCount());
-
+
// Add the proxy to the list to deliver
-
- toDeliver.add(mp);
-
+
+ toDeliver.add(mp);
+
bufferFull = toDeliver.size() >= prefetchSize;
-
+
if (!clientConsumerFull)
- {
+ {
try
{
if (trace) { log.trace(this + " scheduling a new Deliverer"); }
@@ -285,12 +281,12 @@
log.warn("Thread interrupted", e);
}
}
-
- return delivery;
+
+ return delivery;
}
- }
-
+ }
+
// Filter implementation -----------------------------------------
public boolean accept(Routable r)
@@ -303,7 +299,7 @@
if (messageSelector != null)
{
accept = messageSelector.accept(r);
-
+
if (trace) { log.trace("message selector " + (accept ? "accepts " : "DOES NOT accept ") + "the message"); }
}
}
@@ -332,21 +328,21 @@
try
{
if (trace) { log.trace(this + " closing"); }
-
- stop();
+
+ stop();
}
catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
- }
+ }
}
-
+
public void close() throws JMSException
- {
+ {
try
{
synchronized (lock)
- {
+ {
// On close we only disconnect the consumer from the Channel we don't actually remove
// it. This is because it may still contain deliveries that may well be acknowledged
// after the consumer has closed. This is perfectly valid.
@@ -356,11 +352,11 @@
// keeping deliveries after this is closed.
if (trace) { log.trace(this + " grabbed the main lock in close()"); }
-
- disconnect();
-
+
+ disconnect();
+
JMSDispatcher.instance.unregisterTarget(new Integer(id));
-
+
// If it's a subscription, remove it
if (channel instanceof Subscription)
{
@@ -368,10 +364,10 @@
if (!sub.isRecoverable())
{
//We don't disconnect durable subs
- sub.disconnect();
- }
- }
-
+ sub.disconnect();
+ }
+ }
+
// If it's non recoverable, i.e. it's a non durable sub or a temporary queue then remove
// all its references
@@ -379,7 +375,7 @@
{
channel.removeAllReferences();
}
-
+
closed = true;
}
}
@@ -393,15 +389,15 @@
{
return closed;
}
-
+
// ConsumerEndpoint implementation -------------------------------
-
+
/*
* This is called by the client consumer to tell the server to wake up and start sending more
* messages if available
*/
public void more() throws JMSException
- {
+ {
try
{
/*
@@ -416,83 +412,90 @@
5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
though the client needs messages
*/
- this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
-
+ this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
+
//Run a deliverer to deliver any existing ones
this.executor.execute(new Deliverer());
-
+
//TODO Why do we need to wait for it to execute??
//Why not just return immediately?
-
+
//Now wait for it to execute
Future result = new Future();
-
+
this.executor.execute(new Waiter(result));
-
+
result.getResult();
-
+
//Now we know the deliverer has delivered any outstanding messages to the client buffer
-
+
channel.deliver(false);
}
catch (InterruptedException e)
{
log.warn("Thread interrupted", e);
- }
+ }
catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " more");
}
}
-
-
+
+
// Public --------------------------------------------------------
-
+
public String toString()
{
return "ConsumerEndpoint[" + id + "]";
}
-
+
public JBossDestination getDestination()
{
return destination;
}
-
+
public ServerSessionEndpoint getSessionEndpoint()
{
return sessionEndpoint;
}
-
+
public int getId()
{
return id;
}
-
+
// Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
+
+ // Protected -----------------------------------------------------
+
protected void acknowledgeTransactionally(long messageID, Transaction tx) throws Throwable
{
if (trace) { log.trace("acknowledging transactionally " + messageID); }
-
+
SingleReceiverDelivery d = null;
-
+
// The actual removal of the deliveries from the delivery list is deferred until tx commit
synchronized (lock)
{
+ if (closed)
+ {
+ //We must throw an exception to the client otherwise they may think the ack has succeeded
+ //This can happen if the connection is closed by another thread - e.g. the leasePinger
+ throw new IllegalStateException("Failed to acknowledge message, session has been closed");
+ }
+
d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
}
-
+
DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
-
+
if (deliveryCallback == null)
{
deliveryCallback = new DeliveryCallback();
tx.addKeyedCallback(deliveryCallback, this);
}
deliveryCallback.addMessageID(messageID);
-
+
if (d != null)
{
d.acknowledge(tx);
@@ -500,29 +503,36 @@
else
{
throw new IllegalStateException("Failed to acknowledge delivery " + d);
- }
- }
-
+ }
+ }
+
protected void acknowledge(long messageID) throws Throwable
- {
- // acknowledge a delivery
+ {
+ // acknowledge a delivery
SingleReceiverDelivery d;
-
+
synchronized (lock)
{
+ if (closed)
+ {
+ //We must throw an exception to the client otherwise they may think the ack has succeeded
+ //This can happen if the connection is closed by another thread - e.g. the leasePinger
+ throw new IllegalStateException("Failed to acknowledge message, session has been closed");
+ }
+
d = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
}
-
+
if (d != null)
{
d.acknowledge(null);
}
else
- {
+ {
throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
- }
+ }
}
-
+
/**
* Actually remove the consumer and clear up any deliveries it may have
* This is called by the session on session.close()
@@ -530,32 +540,33 @@
*
**/
protected void remove() throws Throwable
- {
+ {
if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
-
+
boolean wereDeliveries = false;
- for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
- {
- SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
- d.cancel();
- wereDeliveries = true;
- }
- deliveries.clear();
-
- if (!disconnected)
+ synchronized (lock)
{
- if (!closed)
+ for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
{
- close();
+ SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
+
+ d.cancel();
+ wereDeliveries = true;
}
+ deliveries.clear();
}
-
+
+ if (!disconnected && !closed)
+ {
+ close();
+ }
+
sessionEndpoint.getConnectionEndpoint().
- getServerPeer().removeConsumerEndpoint(new Integer(id));
-
+ getServerPeer().removeConsumerEndpoint(new Integer(id));
+
sessionEndpoint.removeConsumerEndpoint(id);
-
+
if (wereDeliveries)
{
//If we cancelled any deliveries we need to force a deliver on the channel
@@ -563,34 +574,34 @@
//any of the cancelled messages
channel.deliver(false);
}
- }
-
+ }
+
protected void promptDelivery()
{
channel.deliver(false);
}
-
+
protected void sendToDLQ(Long messageID, Transaction tx) throws Throwable
{
SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
-
+
if (del != null)
- {
+ {
log.warn(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
-
+
if (dlq != null)
- {
+ {
//reset delivery count to zero
del.getReference().setDeliveryCount(0);
-
+
dlq.handle(null, del.getReference(), tx);
-
- del.acknowledge(tx);
+
+ del.acknowledge(tx);
}
else
{
log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-
+
del.acknowledge(tx);
}
}
@@ -598,31 +609,44 @@
{
throw new IllegalStateException("Cannot find delivery to send to DLQ:" + id);
}
-
+
}
-
+
protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
{
- SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
-
+ SingleReceiverDelivery del;
+
+ synchronized (lock)
+ {
+ if (closed)
+ {
+ //This can happen if the connection is closed by another thread - e.g. the leasePinger
+ //We should then ignore the cancel since the delivery will have already been cancelled
+ //back to the queue
+ return;
+ }
+
+ del = (SingleReceiverDelivery)deliveries.remove(messageID);
+ }
+
if (del != null)
- {
+ {
//Cancel back to the queue
-
+
//Update the delivery count
-
+
del.getReference().setDeliveryCount(deliveryCount);
-
- del.cancel();
+
+ del.cancel();
}
else
{
throw new IllegalStateException("Cannot find delivery to cancel:" + id);
}
}
-
+
protected void start()
- {
+ {
synchronized (lock)
{
// Can't start or stop it if it is closed.
@@ -630,21 +654,21 @@
{
return;
}
-
+
if (started)
{
return;
}
-
+
started = true;
}
-
+
// Prompt delivery
channel.deliver(false);
}
-
+
protected void stop() throws Throwable
- {
+ {
//We need to:
//Stop accepting any new messages in the SCE
//Flush any messages from the SCE to the buffer
@@ -659,64 +683,58 @@
{
return;
}
-
+
started = false;
}
-
+
//Now we know no more messages will be accepted in the SCE
-
+
try
{
//Flush any messages waiting to be sent to the client
this.executor.execute(new Deliverer());
-
+
//Now wait for it to execute
Future result = new Future();
-
+
this.executor.execute(new Waiter(result));
-
+
result.getResult();
-
+
//Now we know any deliverer has delivered any outstanding messages to the client buffer
}
catch (InterruptedException e)
{
log.warn("Thread interrupted", e);
}
-
+
//Now we know that there are no in flight messages on the way to the client consumer.
-
+
//But there may be messages still in the toDeliver list since the client consumer might be full
//So we need to cancel these
-
+
if (!toDeliver.isEmpty())
{
- // this is to avoid the deadlock described in http://jira.jboss.com/jira/browse/JBMESSAGING-660
- List toDeliverCopy = new ArrayList(toDeliver.size());
synchronized (lock)
{
for (int i = toDeliver.size() - 1; i >= 0; i--)
{
- toDeliverCopy.add(toDeliver.get(i));
- }
+ MessageProxy proxy = (MessageProxy)toDeliver.get(i);
- toDeliver.clear();
- }
+ long id = proxy.getMessage().getMessageID();
- for(Iterator i = toDeliverCopy.iterator(); i.hasNext();)
- {
- MessageProxy proxy = (MessageProxy)i.next();
- long id = proxy.getMessage().getMessageID();
- cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
+ cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
+ }
}
+ toDeliver.clear();
+
bufferFull = false;
+ }
+ }
- }
- }
-
// Private -------------------------------------------------------
-
+
/**
* Disconnect this consumer from the Channel that feeds it. This method does not clear up
* deliveries
@@ -724,18 +742,18 @@
private void disconnect()
{
boolean removed = channel.remove(this);
-
+
if (removed)
{
disconnected = true;
if (trace) { log.trace(this + " removed from the channel"); }
}
}
-
- // Inner classes -------------------------------------------------
-
+
+ // Inner classes -------------------------------------------------
+
/*
- * Delivers messages to the client
+ * Delivers messages to the client
* TODO - We can make this a bit more intelligent by letting it measure the rate
* the client is consuming messages and send messages at that rate.
* This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
@@ -752,9 +770,9 @@
if (trace) { log.trace(this + " client consumer full, do nothing"); }
return;
}
-
+
List list = null;
-
+
synchronized (lock)
{
if (trace) { log.trace(this + " has the main lock, attempting delivery"); }
@@ -766,7 +784,7 @@
bufferFull = false;
}
}
-
+
if (list == null)
{
if (trace) { log.trace(this + " has a null list, returning"); }
@@ -830,7 +848,7 @@
return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
}
}
-
+
/*
* The purpose of this class is to put it on the QueuedExecutor and wait for it to run
* We can then ensure that all the Runnables in front of it on the queue have also executed
@@ -841,20 +859,20 @@
private class Waiter implements Runnable
{
Future result;
-
+
Waiter(Future result)
{
this.result = result;
}
-
+
public void run()
{
result.setResult(null);
}
}
-
+
/**
- *
+ *
* The purpose of this class is to remove deliveries from the delivery list on commit
* Each transaction has once instance of this per SCE
*
@@ -862,27 +880,27 @@
private class DeliveryCallback implements TxCallback
{
List delList = new ArrayList();
-
+
public void beforePrepare()
- {
+ {
//NOOP
}
-
+
public void beforeCommit(boolean onePhase)
- {
+ {
//NOOP
}
-
+
public void beforeRollback(boolean onePhase)
- {
+ {
//NOOP
}
-
+
public void afterPrepare()
- {
+ {
//NOOP
}
-
+
public synchronized void afterCommit(boolean onePhase) throws TransactionException
{
// Remove the deliveries from the delivery map.
@@ -890,23 +908,23 @@
while (iter.hasNext())
{
Long messageID = (Long)iter.next();
-
+
if (deliveries.remove(messageID) == null)
{
throw new TransactionException("Failed to remove delivery " + messageID);
}
}
}
-
+
public void afterRollback(boolean onePhase) throws TransactionException
- {
+ {
//NOOP
}
-
+
synchronized void addMessageID(long messageID)
{
delList.add(new Long(messageID));
}
}
-
+
}
More information about the jboss-cvs-commits
mailing list