[jboss-cvs] JBoss Messaging SVN: r1672 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 1 14:37:17 EST 2006
Author: timfox
Date: 2006-12-01 14:37:10 -0500 (Fri, 01 Dec 2006)
New Revision: 1672
Modified:
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
More for patch
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-01 19:37:10 UTC (rev 1672)
@@ -76,7 +76,7 @@
private static final Logger log = Logger.getLogger(ServerConsumerEndpoint.class);
- // Static --------------------------------------------------------
+ // Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -118,9 +118,9 @@
private Object lock;
private Map deliveries;
-
+
private CoreDestination dlq;
-
+
// Constructors --------------------------------------------------
protected ServerConsumerEndpoint(int id, Channel channel,
@@ -148,7 +148,7 @@
sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
this.executor = (QueuedExecutor)pool.get("consumer" + id);
-
+
// Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
// deliveries for the same consumer happen serially, since even if they are queued serially
// the actual deliveries can happen in parallel, resulting in a later one "overtaking" an
@@ -162,9 +162,9 @@
this.noLocal = noLocal;
this.destination = dest;
-
+
this.toDeliver = new ArrayList();
-
+
this.lock = new Object();
if (selector != null)
@@ -174,7 +174,7 @@
if (trace) log.trace("created selector");
}
- //FIXME -
+ //FIXME -
//We really need to get rid of this delivery list - it's only purpose in life is to solve
//the race condition where acks or cancels can come in before handle has returned - and
//that can be solved in a simpler way anyway.
@@ -183,15 +183,15 @@
//and when we do clustering we will have to replicate it too!!
//Let's GET RID OF IT!!!!!!!!!!!
this.deliveries = new LinkedHashMap();
-
+
this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
// adding the consumer to the channel
this.channel.add(this);
-
+
// prompt delivery
channel.deliver(false);
-
+
log.debug(this + " constructed");
}
@@ -203,21 +203,21 @@
public Delivery handle(DeliveryObserver observer, Routable reference, Transaction tx)
{
if (trace) { log.trace(this + " receives " + reference + " for delivery"); }
-
+
// This is ok to have outside lock - is volatile
if (bufferFull)
{
// We buffer a maximum of PREFETCH_LIMIT messages at once
-
+
if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
-
+
return null;
}
-
+
// Need to synchronized around the whole block to prevent setting started = false
// but handle is already running and a message is deposited during the stop procedure.
synchronized (lock)
- {
+ {
// If the consumer is stopped then we don't accept the message, it should go back into the
// channel for delivery later.
if (!started)
@@ -237,40 +237,40 @@
if (trace) { log.trace(this + " has the main lock, preparing the message for delivery"); }
MessageReference ref = (MessageReference)reference;
-
+
JBossMessage message = (JBossMessage)ref.getMessage();
-
+
boolean selectorRejected = !this.accept(message);
-
+
SimpleDelivery delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
-
+
if (selectorRejected)
{
return delivery;
}
-
+
if (delivery.isDone())
{
return delivery;
}
-
- deliveries.put(new Long(ref.getMessageID()), delivery);
-
+
+ deliveries.put(new Long(ref.getMessageID()), delivery);
+
// We don't send the message as-is, instead we create a MessageProxy instance. This allows
// local fields such as deliveryCount to be handled by the proxy but global data to be
// fielded by the same underlying Message instance. This allows us to avoid expensive
// copying of messages
-
+
MessageProxy mp = JBossMessage.createThinDelegate(message, ref.getDeliveryCount());
-
+
// Add the proxy to the list to deliver
-
- toDeliver.add(mp);
-
+
+ toDeliver.add(mp);
+
bufferFull = toDeliver.size() >= prefetchSize;
-
+
if (!clientConsumerFull)
- {
+ {
try
{
if (trace) { log.trace(this + " scheduling a new Deliverer"); }
@@ -281,12 +281,12 @@
log.warn("Thread interrupted", e);
}
}
-
- return delivery;
+
+ return delivery;
}
- }
+ }
+
-
// Filter implementation -----------------------------------------
public boolean accept(Routable r)
@@ -299,7 +299,7 @@
if (messageSelector != null)
{
accept = messageSelector.accept(r);
-
+
if (trace) { log.trace("message selector " + (accept ? "accepts " : "DOES NOT accept ") + "the message"); }
}
}
@@ -328,21 +328,21 @@
try
{
if (trace) { log.trace(this + " closing"); }
-
- stop();
+
+ stop();
}
catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
- }
+ }
}
-
+
public void close() throws JMSException
- {
+ {
try
{
synchronized (lock)
- {
+ {
// On close we only disconnect the consumer from the Channel we don't actually remove
// it. This is because it may still contain deliveries that may well be acknowledged
// after the consumer has closed. This is perfectly valid.
@@ -352,11 +352,11 @@
// keeping deliveries after this is closed.
if (trace) { log.trace(this + " grabbed the main lock in close()"); }
-
- disconnect();
-
+
+ disconnect();
+
JMSDispatcher.instance.unregisterTarget(new Integer(id));
-
+
// If it's a subscription, remove it
if (channel instanceof Subscription)
{
@@ -364,10 +364,10 @@
if (!sub.isRecoverable())
{
//We don't disconnect durable subs
- sub.disconnect();
- }
- }
-
+ sub.disconnect();
+ }
+ }
+
// If it's non recoverable, i.e. it's a non durable sub or a temporary queue then remove
// all its references
@@ -375,7 +375,7 @@
{
channel.removeAllReferences();
}
-
+
closed = true;
}
}
@@ -389,15 +389,15 @@
{
return closed;
}
-
+
// ConsumerEndpoint implementation -------------------------------
-
+
/*
* This is called by the client consumer to tell the server to wake up and start sending more
* messages if available
*/
public void more() throws JMSException
- {
+ {
try
{
/*
@@ -412,90 +412,83 @@
5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
though the client needs messages
*/
- this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
-
+ this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
+
//Run a deliverer to deliver any existing ones
this.executor.execute(new Deliverer());
-
+
//TODO Why do we need to wait for it to execute??
//Why not just return immediately?
-
+
//Now wait for it to execute
Future result = new Future();
-
+
this.executor.execute(new Waiter(result));
-
+
result.getResult();
-
+
//Now we know the deliverer has delivered any outstanding messages to the client buffer
-
+
channel.deliver(false);
}
catch (InterruptedException e)
{
log.warn("Thread interrupted", e);
- }
+ }
catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " more");
}
}
-
-
+
+
// Public --------------------------------------------------------
-
+
public String toString()
{
return "ConsumerEndpoint[" + id + "]";
}
-
+
public JBossDestination getDestination()
{
return destination;
}
-
+
public ServerSessionEndpoint getSessionEndpoint()
{
return sessionEndpoint;
}
-
+
public int getId()
{
return id;
}
-
+
// Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
+
+ // Protected -----------------------------------------------------
+
protected void acknowledgeTransactionally(long messageID, Transaction tx) throws Throwable
{
if (trace) { log.trace("acknowledging transactionally " + messageID); }
-
+
SingleReceiverDelivery d = null;
-
+
// The actual removal of the deliveries from the delivery list is deferred until tx commit
synchronized (lock)
{
- if (closed)
- {
- //We must throw an exception to the client otherwise they may think the ack has succeeded
- //This can happen if the connection is closed by another thread - e.g. the leasePinger
- throw new IllegalStateException("Failed to acknowledge message, session has been closed");
- }
-
d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
}
-
+
DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
-
+
if (deliveryCallback == null)
{
deliveryCallback = new DeliveryCallback();
tx.addKeyedCallback(deliveryCallback, this);
}
deliveryCallback.addMessageID(messageID);
-
+
if (d != null)
{
d.acknowledge(tx);
@@ -503,36 +496,29 @@
else
{
throw new IllegalStateException("Failed to acknowledge delivery " + d);
- }
- }
-
+ }
+ }
+
protected void acknowledge(long messageID) throws Throwable
- {
- // acknowledge a delivery
+ {
+ // acknowledge a delivery
SingleReceiverDelivery d;
-
+
synchronized (lock)
{
- if (closed)
- {
- //We must throw an exception to the client otherwise they may think the ack has succeeded
- //This can happen if the connection is closed by another thread - e.g. the leasePinger
- throw new IllegalStateException("Failed to acknowledge message, session has been closed");
- }
-
d = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
}
-
+
if (d != null)
{
d.acknowledge(null);
}
else
- {
+ {
throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
- }
+ }
}
-
+
/**
* Actually remove the consumer and clear up any deliveries it may have
* This is called by the session on session.close()
@@ -540,33 +526,32 @@
*
**/
protected void remove() throws Throwable
- {
+ {
if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
-
+
boolean wereDeliveries = false;
+ for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
+ {
+ SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
- synchronized (lock)
+ d.cancel();
+ wereDeliveries = true;
+ }
+ deliveries.clear();
+
+ if (!disconnected)
{
- for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
+ if (!closed)
{
- SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
-
- d.cancel();
- wereDeliveries = true;
+ close();
}
- deliveries.clear();
}
-
- if (!disconnected && !closed)
- {
- close();
- }
-
+
sessionEndpoint.getConnectionEndpoint().
- getServerPeer().removeConsumerEndpoint(new Integer(id));
-
+ getServerPeer().removeConsumerEndpoint(new Integer(id));
+
sessionEndpoint.removeConsumerEndpoint(id);
-
+
if (wereDeliveries)
{
//If we cancelled any deliveries we need to force a deliver on the channel
@@ -574,34 +559,34 @@
//any of the cancelled messages
channel.deliver(false);
}
- }
-
+ }
+
protected void promptDelivery()
{
channel.deliver(false);
}
-
+
protected void sendToDLQ(Long messageID, Transaction tx) throws Throwable
{
SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
-
+
if (del != null)
- {
+ {
log.warn(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
-
+
if (dlq != null)
- {
+ {
//reset delivery count to zero
del.getReference().setDeliveryCount(0);
-
+
dlq.handle(null, del.getReference(), tx);
-
- del.acknowledge(tx);
+
+ del.acknowledge(tx);
}
else
{
log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-
+
del.acknowledge(tx);
}
}
@@ -609,44 +594,31 @@
{
throw new IllegalStateException("Cannot find delivery to send to DLQ:" + id);
}
-
+
}
-
+
protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
{
- SingleReceiverDelivery del;
-
- synchronized (lock)
- {
- if (closed)
- {
- //This can happen if the connection is closed by another thread - e.g. the leasePinger
- //We should then ignore the cancel since the delivery will have already been cancelled
- //back to the queue
- return;
- }
-
- del = (SingleReceiverDelivery)deliveries.remove(messageID);
- }
-
+ SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
+
if (del != null)
- {
+ {
//Cancel back to the queue
-
+
//Update the delivery count
-
+
del.getReference().setDeliveryCount(deliveryCount);
-
- del.cancel();
+
+ del.cancel();
}
else
{
throw new IllegalStateException("Cannot find delivery to cancel:" + id);
}
}
-
+
protected void start()
- {
+ {
synchronized (lock)
{
// Can't start or stop it if it is closed.
@@ -654,21 +626,21 @@
{
return;
}
-
+
if (started)
{
return;
}
-
+
started = true;
}
-
+
// Prompt delivery
channel.deliver(false);
}
-
+
protected void stop() throws Throwable
- {
+ {
//We need to:
//Stop accepting any new messages in the SCE
//Flush any messages from the SCE to the buffer
@@ -683,58 +655,58 @@
{
return;
}
-
+
started = false;
}
-
+
//Now we know no more messages will be accepted in the SCE
-
+
try
{
//Flush any messages waiting to be sent to the client
this.executor.execute(new Deliverer());
-
+
//Now wait for it to execute
Future result = new Future();
-
+
this.executor.execute(new Waiter(result));
-
+
result.getResult();
-
+
//Now we know any deliverer has delivered any outstanding messages to the client buffer
}
catch (InterruptedException e)
{
log.warn("Thread interrupted", e);
}
-
+
//Now we know that there are no in flight messages on the way to the client consumer.
-
+
//But there may be messages still in the toDeliver list since the client consumer might be full
//So we need to cancel these
-
+
if (!toDeliver.isEmpty())
- {
+ {
synchronized (lock)
{
for (int i = toDeliver.size() - 1; i >= 0; i--)
{
MessageProxy proxy = (MessageProxy)toDeliver.get(i);
-
+
long id = proxy.getMessage().getMessageID();
-
+
cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
}
}
-
+
toDeliver.clear();
-
+
bufferFull = false;
- }
+ }
}
-
+
// Private -------------------------------------------------------
-
+
/**
* Disconnect this consumer from the Channel that feeds it. This method does not clear up
* deliveries
@@ -742,18 +714,18 @@
private void disconnect()
{
boolean removed = channel.remove(this);
-
+
if (removed)
{
disconnected = true;
if (trace) { log.trace(this + " removed from the channel"); }
}
}
-
- // Inner classes -------------------------------------------------
-
+
+ // Inner classes -------------------------------------------------
+
/*
- * Delivers messages to the client
+ * Delivers messages to the client
* TODO - We can make this a bit more intelligent by letting it measure the rate
* the client is consuming messages and send messages at that rate.
* This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
@@ -770,9 +742,9 @@
if (trace) { log.trace(this + " client consumer full, do nothing"); }
return;
}
-
+
List list = null;
-
+
synchronized (lock)
{
if (trace) { log.trace(this + " has the main lock, attempting delivery"); }
@@ -784,7 +756,7 @@
bufferFull = false;
}
}
-
+
if (list == null)
{
if (trace) { log.trace(this + " has a null list, returning"); }
@@ -796,20 +768,7 @@
try
{
- if (trace)
- {
- StringBuffer sb = new StringBuffer(ServerConsumerEndpoint.this + " handing [");
- for(int i = 0; i < list.size(); i++)
- {
- sb.append(((MessageProxy)list.get(i)).getMessage().getMessageID());
- if (i < list.size() - 1)
- {
- sb.append(",");
- }
- }
- sb.append("] over to the remoting layer");
- log.trace(sb.toString());
- }
+ if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
ClientDelivery del = new ClientDelivery(list, id);
@@ -848,7 +807,7 @@
return "Deliverer[" + Integer.toHexString(hashCode()) + "]";
}
}
-
+
/*
* The purpose of this class is to put it on the QueuedExecutor and wait for it to run
* We can then ensure that all the Runnables in front of it on the queue have also executed
@@ -859,20 +818,20 @@
private class Waiter implements Runnable
{
Future result;
-
+
Waiter(Future result)
{
this.result = result;
}
-
+
public void run()
{
result.setResult(null);
}
}
-
+
/**
- *
+ *
* The purpose of this class is to remove deliveries from the delivery list on commit
* Each transaction has once instance of this per SCE
*
@@ -880,62 +839,51 @@
private class DeliveryCallback implements TxCallback
{
List delList = new ArrayList();
-
+
public void beforePrepare()
- {
+ {
//NOOP
}
-
+
public void beforeCommit(boolean onePhase)
- {
+ {
//NOOP
}
-
+
public void beforeRollback(boolean onePhase)
- {
+ {
//NOOP
}
-
+
public void afterPrepare()
- {
+ {
//NOOP
}
-
+
public synchronized void afterCommit(boolean onePhase) throws TransactionException
{
// Remove the deliveries from the delivery map.
-
- synchronized (lock)
+ Iterator iter = delList.iterator();
+ while (iter.hasNext())
{
- if (closed)
- {
- //We must throw an exception to the client otherwise they may think the commit has succeeded
- //This can happen if the connection is closed by another thread - e.g. the leasePinger
- throw new TransactionException("Failed to acknowledge message, session has been closed");
- }
+ Long messageID = (Long)iter.next();
- Iterator iter = delList.iterator();
- while (iter.hasNext())
+ if (deliveries.remove(messageID) == null)
{
- Long messageID = (Long)iter.next();
-
- if (deliveries.remove(messageID) == null)
- {
- throw new TransactionException("Failed to remove delivery " + messageID);
- }
+ throw new TransactionException("Failed to remove delivery " + messageID);
}
}
}
-
+
public void afterRollback(boolean onePhase) throws TransactionException
- {
+ {
//NOOP
}
-
+
synchronized void addMessageID(long messageID)
{
delList.add(new Long(messageID));
}
}
-
+
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-01 19:37:10 UTC (rev 1672)
@@ -278,29 +278,8 @@
public void cancel(Delivery d) throws Throwable
{
- // We put the cancellation on the event queue
- // try
- // {
- // Future result = new Future();
- //
- // this.executor.execute(new CancelRunnable(d, result));
- //
- // //For now we wait for result, but this may not be necessary
- // result.getResult();
- // }
- // catch (InterruptedException e)
- // {
- // log.warn("Thread interrupted", e);
- // }
-
-// Exception e = new Exception();
-//
-// log.error("cancelling delivery: " + d, e);
-//
-
-
- // TODO We should also consider executing cancels on the event queue
- cancelInternal(d);
+ //We put the cancellation on the event queue
+ this.executor.execute(new CancelRunnable(d));
}
// Distributor implementation ------------------------------------
@@ -645,24 +624,23 @@
// The iterator is used to iterate through the refs in the channel in the case that they
// don't match the selectors of any receivers.
ListIterator iter = null;
-
+
MessageReference ref = null;
-
+
while (true)
- {
+ {
synchronized (refLock)
- {
+ {
if (iter == null)
{
- //ref = (MessageReference) messageRefs.peekFirst();
- ref = removeFirstInMemory();
+ ref = (MessageReference) messageRefs.peekFirst();
}
else
{
if (iter.hasNext())
- {
+ {
ref = (MessageReference)iter.next();
- }
+ }
else
{
ref = null;
@@ -672,10 +650,9 @@
if (ref != null)
{
- if (trace) { log.trace(this + " pushing " + ref); }
-
- // Check if message is expired (we also do this on the client side). If so ack it
- // from the channel.
+ // Check if message is expired (we also do this on the client
+ // side)
+ // If so ack it from the channel
if (ref.isExpired())
{
if (trace) { log.trace("Message reference: " + ref + " has expired"); }
@@ -683,8 +660,7 @@
// remove and acknowledge it
if (iter == null)
{
- //already removed
- //removeFirstInMemory();
+ removeFirstInMemory();
}
else
{
@@ -711,28 +687,15 @@
if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
receiversReady = false;
-
- if (iter == null)
- {
- // add the message back
- synchronized (refLock)
- {
- messageRefs.addFirst(ref, ref.getPriority());
- }
- }
- else
- {
- //we didn't remove it in the first place
- }
-
+
return;
}
else if (!del.isSelectorAccepted())
{
// No receiver accepted the message because no selectors matched, so we create
// an iterator (if we haven't already created it) to iterate through the refs
- // in the channel.
-
+ // in the channel.
+
// TODO Note that this is only a partial solution since if there are messages
// paged to storage it won't try those - i.e. it will only iterate through
// those refs in memory. Dealing with refs in storage is somewhat tricky since
@@ -742,27 +705,15 @@
// indexes here to prevent having to iterate through all the refs every time.
// Having said all that, having consumers on a queue that don't match many
// messages is an antipattern and should be avoided by the user.
-
if (iter == null)
{
- //Add the message back
-
- synchronized (refLock)
- {
- messageRefs.addFirst(ref, ref.getPriority());
-
- iter = messageRefs.iterator();
-
- //And skip the first one (that's the one we just added back)
-
- iter.next();
- }
- }
+ iter = messageRefs.iterator();
+ }
}
else
{
if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-
+
// Receiver accepted the reference
// We must synchronize here to cope with another race condition where message
@@ -776,9 +727,11 @@
// FIXME - It's actually possible the delivery could be
// cancelled before it reaches
// here, in which case we wouldn't get a delivery but we
- // still need to increment the delivery count
+ // still need to increment the
+ // delivery count
// All the problems related to these race conditions and
- // fiddly edge cases will disappear once we do
+ // fiddly edge cases will disappear
+ // once we do
// http://jira.jboss.com/jira/browse/JBMESSAGING-355
// This will make life a lot easier
@@ -786,13 +739,11 @@
{
if (iter == null)
{
- //do nothing - already removed
- //removeFirstInMemory();
+ removeFirstInMemory();
}
else
{
- iter.remove();
- if (trace) { log.trace(this + " removed current message from iterator"); }
+ iter.remove();
}
// delivered
@@ -802,7 +753,6 @@
synchronized (deliveryLock)
{
deliveries.add(del);
- if (trace) { log.trace(this + " starting to track " + del); }
}
}
}
@@ -1734,7 +1684,29 @@
}
}
}
+
+ private class CancelRunnable implements Runnable
+ {
+ Delivery del;
+
+ CancelRunnable(Delivery del)
+ {
+ this.del = del;
+ }
+ public void run()
+ {
+ try
+ {
+ cancelInternal(del);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to cancel delivery", e);
+ }
+ }
+ }
+
private class HandleRunnable implements Runnable
{
Future result;
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-12-01 19:37:10 UTC (rev 1672)
@@ -386,7 +386,7 @@
ps.setLong(2, size);
- int rows = ps.executeUpdate();
+ int rows = updateWithRetry(ps);
if (trace)
{
@@ -418,7 +418,7 @@
ps.setString(2, counterName);
- int rows = ps.executeUpdate();
+ int rows = updateWithRetry(ps);
if (trace)
{
@@ -471,8 +471,6 @@
PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
- final int MAX_TRIES = 25;
-
try
{
conn = ds.getConnection();
@@ -484,41 +482,15 @@
ps.setLong(2, orderEnd);
ps.setLong(3, channelID);
-
- int tries = 0;
-
- while (true)
+
+ int rows = updateWithRetry(ps);
+
+ if (trace)
{
- try
- {
- int rows = ps.executeUpdate();
-
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(updateReliableRefs, new Long(channelID),
- new Long(orderStart), new Long(orderEnd))
- + " updated " + rows + " rows");
- }
- if (tries > 0)
- {
- log.warn("Update worked after retry");
- }
- break;
- }
- catch (SQLException e)
- {
- log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
- tries++;
- if (tries == MAX_TRIES)
- {
- log.error("Retried " + tries + " times, now giving up");
- throw new IllegalStateException("Failed to update references");
- }
- log.warn("Trying again after a pause");
- //Now we wait for a random amount of time to minimise risk of deadlock
- Thread.sleep((long)(Math.random() * 500));
- }
- }
+ log.trace(JDBCUtil.statementToString(updateReliableRefs, new Long(channelID),
+ new Long(orderStart), new Long(orderEnd))
+ + " updated " + rows + " rows");
+ }
}
catch (Exception e)
{
@@ -550,6 +522,8 @@
wrap.end();
}
}
+
+
public int getNumberOfUnloadedReferences(long channelID) throws Exception
{
@@ -859,7 +833,7 @@
}
else
{
- int rows = psInsertReference.executeUpdate();
+ int rows = updateWithRetry(psInsertReference);
if (trace)
{
@@ -916,7 +890,7 @@
{
if (added)
{
- int rows = psInsertMessage.executeUpdate();
+ int rows = updateWithRetry(psInsertMessage);
if (trace)
{
@@ -925,7 +899,7 @@
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = updateWithRetry(psUpdateMessage);
if (trace)
{
@@ -941,7 +915,7 @@
if (usingBatchUpdates)
{
- int[] rowsReference = psInsertReference.executeBatch();
+ int[] rowsReference = updateWithRetryBatch(psInsertReference);
if (trace)
{
@@ -950,7 +924,7 @@
if (messageInsertsInBatch)
{
- int[] rowsMessage = psInsertMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psInsertMessage);
if (trace)
{
@@ -959,7 +933,7 @@
}
if (messageUpdatesInBatch)
{
- int[] rowsMessage = psUpdateMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
if (trace)
{
@@ -1092,7 +1066,7 @@
}
else
{
- int rows = psDeleteReference.executeUpdate();
+ int rows = updateWithRetry(psDeleteReference);
if (trace)
{
@@ -1150,7 +1124,7 @@
{
if (removed)
{
- int rows = psDeleteMessage.executeUpdate();
+ int rows = updateWithRetry(psDeleteMessage);
if (trace)
{
@@ -1159,7 +1133,7 @@
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = updateWithRetry(psUpdateMessage);
if (trace)
{
@@ -1175,7 +1149,7 @@
if (usingBatchUpdates)
{
- int[] rowsReference = psDeleteReference.executeBatch();
+ int[] rowsReference = updateWithRetryBatch(psDeleteReference);
if (trace)
{
@@ -1184,7 +1158,7 @@
if (messageDeletionsInBatch)
{
- int[] rowsMessage = psDeleteMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psDeleteMessage);
if (trace)
{
@@ -1193,7 +1167,7 @@
}
if (messageUpdatesInBatch)
{
- int[] rowsMessage = psUpdateMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
if (trace)
{
@@ -1561,7 +1535,7 @@
}
else
{
- int rows = psUpdateReference.executeUpdate();
+ int rows = updateWithRetry(psUpdateReference);
if (trace)
{
@@ -1575,7 +1549,7 @@
if (usingBatchUpdates)
{
- int[] rowsReference = psUpdateReference.executeBatch();
+ int[] rowsReference = updateWithRetryBatch(psUpdateReference);
if (trace)
{
@@ -1652,7 +1626,7 @@
// Add the reference
addReference(channelID, ref, psReference, true);
- int rows = psReference.executeUpdate();
+ int rows = updateWithRetry(psReference);
if (trace)
{
@@ -1677,7 +1651,7 @@
updateMessageChannelCount(m, psMessage);
}
- rows = psMessage.executeUpdate();
+ rows = updateWithRetry(psMessage);
if (trace)
{
log.trace("Inserted/updated " + rows + " rows");
@@ -1757,7 +1731,7 @@
psReference.setLong(3, ref.getMessageID());
- int rows = psReference.executeUpdate();
+ int rows = updateWithRetry(psReference);
}
catch (Exception e)
{
@@ -1825,7 +1799,7 @@
//Remove the message reference
removeReference(channelID, ref, psReference);
- int rows = psReference.executeUpdate();
+ int rows = updateWithRetry(psReference);
if (trace)
{
@@ -1850,7 +1824,7 @@
updateMessageChannelCount(m, psMessage);
}
- rows = psMessage.executeUpdate();
+ rows = updateWithRetry(psMessage);
if (trace)
{
@@ -2747,7 +2721,7 @@
}
else
{
- int rows = psReference.executeUpdate();
+ int rows = updateWithRetry(psReference);
if (trace)
{
@@ -2802,7 +2776,7 @@
{
if (added)
{
- int rows = psInsertMessage.executeUpdate();
+ int rows = updateWithRetry(psInsertMessage);
if (trace)
{
@@ -2811,7 +2785,7 @@
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = updateWithRetry(psUpdateMessage);
if (trace)
{
@@ -2827,7 +2801,7 @@
if (batch)
{
- int[] rowsReference = psReference.executeBatch();
+ int[] rowsReference = updateWithRetryBatch(psReference);
if (trace)
{
@@ -2836,7 +2810,7 @@
if (messageInsertsInBatch)
{
- int[] rowsMessage = psInsertMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psInsertMessage);
if (trace)
{
@@ -2845,7 +2819,7 @@
}
if (messageUpdatesInBatch)
{
- int[] rowsMessage = psUpdateMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
if (trace)
{
@@ -2896,7 +2870,7 @@
}
else
{
- int rows = psReference.executeUpdate();
+ int rows = updateWithRetry(psReference);
if (trace)
{
@@ -2953,7 +2927,7 @@
{
if (removed)
{
- int rows = psDeleteMessage.executeUpdate();
+ int rows = updateWithRetry(psDeleteMessage);
if (trace)
{
@@ -2962,7 +2936,7 @@
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = updateWithRetry(psUpdateMessage);
if (trace)
{
@@ -2978,7 +2952,7 @@
if (batch)
{
- int[] rowsReference = psReference.executeBatch();
+ int[] rowsReference = updateWithRetryBatch(psReference);
if (trace)
{
@@ -2987,7 +2961,7 @@
if (messageDeletionsInBatch)
{
- int[] rowsMessage = psDeleteMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psDeleteMessage);
if (trace)
{
@@ -2996,7 +2970,7 @@
}
if (messageUpdatesInBatch)
{
- int[] rowsMessage = psUpdateMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
if (trace)
{
@@ -3190,7 +3164,7 @@
{
if (removed)
{
- int rows = psDeleteMessage.executeUpdate();
+ int rows = updateWithRetry(psDeleteMessage);
if (trace)
{
@@ -3199,7 +3173,7 @@
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = updateWithRetry(psUpdateMessage);
if (trace)
{
@@ -3217,7 +3191,7 @@
{
if (messageDeletionsInBatch)
{
- int[] rows = psDeleteMessage.executeBatch();
+ int[] rows = updateWithRetryBatch(psDeleteMessage);
if (trace)
{
@@ -3229,7 +3203,7 @@
}
if (messageUpdatesInBatch)
{
- int[] rows = psUpdateMessage.executeBatch();
+ int[] rows = updateWithRetryBatch(psUpdateMessage);
if (trace)
{
@@ -3364,7 +3338,7 @@
}
else
{
- int rows = psReference.executeUpdate();
+ int rows = updateWithRetry(psReference);
if (trace)
{
@@ -3419,7 +3393,7 @@
{
if (added)
{
- int rows = psInsertMessage.executeUpdate();
+ int rows = updateWithRetry(psInsertMessage);
if (trace)
{
@@ -3428,7 +3402,7 @@
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = updateWithRetry(psUpdateMessage);
if (trace)
{
@@ -3444,7 +3418,7 @@
if (batch)
{
- int[] rowsReference = psReference.executeBatch();
+ int[] rowsReference = updateWithRetryBatch(psReference);
if (trace)
{
@@ -3453,7 +3427,7 @@
if (messageInsertsInBatch)
{
- int[] rowsMessage = psInsertMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psInsertMessage);
if (trace)
{
@@ -3462,7 +3436,7 @@
}
if (messageUpdatesInBatch)
{
- int[] rowsMessage = psUpdateMessage.executeBatch();
+ int[] rowsMessage = updateWithRetryBatch(psUpdateMessage);
if (trace)
{
@@ -3505,7 +3479,7 @@
}
else
{
- int rows = psReference.executeUpdate();
+ int rows = updateWithRetry(psReference);
if (trace)
{
@@ -3519,7 +3493,7 @@
if (batch)
{
- int[] rows = psReference.executeBatch();
+ int[] rows = updateWithRetryBatch(psReference);
if (trace)
{
@@ -3689,7 +3663,7 @@
{
if (removed)
{
- int rows = psDeleteMessage.executeUpdate();
+ int rows = updateWithRetry(psDeleteMessage);
if (trace)
{
@@ -3698,7 +3672,7 @@
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = updateWithRetry(psUpdateMessage);
if (trace)
{
@@ -3716,7 +3690,7 @@
{
if (messageDeletionsInBatch)
{
- int[] rows = psDeleteMessage.executeBatch();
+ int[] rows = updateWithRetryBatch(psDeleteMessage);
if (trace)
{
@@ -3728,7 +3702,7 @@
}
if (messageUpdatesInBatch)
{
- int[] rows = psUpdateMessage.executeBatch();
+ int[] rows = updateWithRetryBatch(psUpdateMessage);
if (trace)
{
@@ -3820,7 +3794,7 @@
ps.setInt(3, formatID);
ps.setBytes(4, xid.getGlobalTransactionId());
- rows = ps.executeUpdate();
+ rows = updateWithRetry(ps);
}
finally
@@ -3845,7 +3819,7 @@
}
}
- protected void removeTXRecord(Connection conn, Transaction tx) throws SQLException
+ protected void removeTXRecord(Connection conn, Transaction tx) throws Exception
{
PreparedStatement ps = null;
try
@@ -3854,7 +3828,7 @@
ps.setLong(1, tx.getId());
- int rows = ps.executeUpdate();
+ int rows = updateWithRetry(ps);
if (trace)
{
@@ -3948,7 +3922,7 @@
ps.setLong(1, tx.getId());
- int rows = ps.executeUpdate();
+ int rows = updateWithRetry(ps);
if (trace)
{
@@ -3959,7 +3933,7 @@
ps = conn.prepareStatement(commitMessageRef2);
ps.setLong(1, tx.getId());
- rows = ps.executeUpdate();
+ rows = updateWithRetry(ps);
if (trace)
{
@@ -3994,7 +3968,7 @@
ps.setLong(1, tx.getId());
- int rows = ps.executeUpdate();
+ int rows = updateWithRetry(ps);
if (trace)
{
@@ -4006,7 +3980,7 @@
ps = conn.prepareStatement(rollbackMessageRef2);
ps.setLong(1, tx.getId());
- rows = ps.executeUpdate();
+ rows = updateWithRetry(ps);
if (trace)
{
@@ -4380,6 +4354,70 @@
log.trace("Batch update " + name + ", " + action + " total of " + count + " rows");
}
+ protected int updateWithRetry(PreparedStatement ps) throws Exception
+ {
+ return updateWithRetry(ps, false)[0];
+ }
+
+ protected int[] updateWithRetryBatch(PreparedStatement ps) throws Exception
+ {
+ return updateWithRetry(ps, true);
+ }
+
+ private int[] updateWithRetry(PreparedStatement ps, boolean batch) throws Exception
+ {
+ final int MAX_TRIES = 25;
+
+ int rows = 0;
+
+ int[] rowsArr = null;
+
+ int tries = 0;
+
+ while (true)
+ {
+ try
+ {
+ if (batch)
+ {
+ rowsArr = ps.executeBatch();
+ }
+ else
+ {
+ rows = ps.executeUpdate();
+ }
+
+ if (tries > 0)
+ {
+ log.warn("Update worked after retry");
+ }
+ break;
+ }
+ catch (SQLException e)
+ {
+ log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+ tries++;
+ if (tries == MAX_TRIES)
+ {
+ log.error("Retried " + tries + " times, now giving up");
+ throw new IllegalStateException("Failed to update references");
+ }
+ log.warn("Trying again after a pause");
+ //Now we wait for a random amount of time to minimise risk of deadlock
+ Thread.sleep((long)(Math.random() * 500));
+ }
+ }
+
+ if (batch)
+ {
+ return rowsArr;
+ }
+ else
+ {
+ return new int[] { rows };
+ }
+ }
+
// Private -------------------------------------------------------
// never access directly
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-01 09:47:27 UTC (rev 1671)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-01 19:37:10 UTC (rev 1672)
@@ -895,6 +895,9 @@
cons1.close();
+ //Cancelling is asynch so can take some time
+ Thread.sleep(500);
+
//rollback should cause redelivery of messages
//in this case redelivery occurs to a different receiver
More information about the jboss-cvs-commits
mailing list