[jboss-cvs] JBoss Messaging SVN: r1674 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/server/endpoint tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 1 15:55:33 EST 2006
Author: timfox
Date: 2006-12-01 15:55:27 -0500 (Fri, 01 Dec 2006)
New Revision: 1674
Modified:
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
More on fix
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-01 20:09:48 UTC (rev 1673)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-01 20:55:27 UTC (rev 1674)
@@ -222,15 +222,8 @@
// channel for delivery later.
if (!started)
{
- // this is a common programming error, make this visible in the debug logs. However,
- // make also possible to cut out the performance overhead for systems that raise the
- // threshold to INFO or higher.
+ if (trace) { log.debug(this + " NOT started yet!"); }
- if (log.isDebugEnabled())
- {
- log.debug(this + " NOT started yet!");
- }
-
return null;
}
@@ -529,22 +522,27 @@
{
if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
- boolean wereDeliveries = false;
- for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
- {
- SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
-
- d.cancel();
- wereDeliveries = true;
- }
- deliveries.clear();
+ boolean wereDeliveries = false;
- if (!disconnected)
- {
- if (!closed)
+ synchronized (lock)
+ {
+ for (Iterator i = deliveries.values().iterator(); i.hasNext(); )
{
- close();
+ SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
+
+ d.cancel();
+ wereDeliveries = true;
}
+
+ deliveries.clear();
+
+ if (!disconnected)
+ {
+ if (!closed)
+ {
+ close();
+ }
+ }
}
sessionEndpoint.getConnectionEndpoint().
@@ -599,8 +597,13 @@
protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
{
- SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
+ SingleReceiverDelivery del = null;
+ synchronized (lock)
+ {
+ del = (SingleReceiverDelivery)deliveries.remove(messageID);
+ }
+
if (del != null)
{
//Cancel back to the queue
@@ -655,10 +658,10 @@
{
return;
}
+ }
- started = false;
- }
-
+ started = false;
+
//Now we know no more messages will be accepted in the SCE
try
@@ -687,22 +690,19 @@
if (!toDeliver.isEmpty())
{
- synchronized (lock)
+ for (int i = toDeliver.size() - 1; i >= 0; i--)
{
- for (int i = toDeliver.size() - 1; i >= 0; i--)
- {
- MessageProxy proxy = (MessageProxy)toDeliver.get(i);
-
- long id = proxy.getMessage().getMessageID();
-
- cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
- }
+ MessageProxy proxy = (MessageProxy)toDeliver.get(i);
+
+ long id = proxy.getMessage().getMessageID();
+
+ cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
}
-
+
toDeliver.clear();
bufferFull = false;
- }
+ }
}
// Private -------------------------------------------------------
@@ -862,15 +862,17 @@
public synchronized void afterCommit(boolean onePhase) throws TransactionException
{
- // Remove the deliveries from the delivery map.
- Iterator iter = delList.iterator();
- while (iter.hasNext())
+ synchronized (lock)
{
- Long messageID = (Long)iter.next();
-
- if (deliveries.remove(messageID) == null)
+ // Remove the deliveries from the delivery map.
+ Iterator iter = delList.iterator();
+ while (iter.hasNext())
{
- throw new TransactionException("Failed to remove delivery " + messageID);
+ Long messageID = (Long)iter.next();
+
+ //The message might have been removed already if the consumer was closed for instance
+ //This is ok
+ deliveries.remove(messageID);
}
}
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-01 20:09:48 UTC (rev 1673)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-01 20:55:27 UTC (rev 1674)
@@ -384,7 +384,7 @@
}
}
- public void close() throws JMSException
+ public synchronized void close() throws JMSException
{
try
{
@@ -438,10 +438,15 @@
}
}
- public void acknowledgeBatch(List ackInfos) throws JMSException
+ public synchronized void acknowledgeBatch(List ackInfos) throws JMSException
{
try
{
+ if (closed)
+ {
+ throw new javax.jms.IllegalStateException("Cannot acknowledge batch since session is closed");
+ }
+
Iterator iter = ackInfos.iterator();
while (iter.hasNext())
@@ -457,10 +462,15 @@
}
}
- public void acknowledge(AckInfo ackInfo) throws JMSException
+ public synchronized void acknowledge(AckInfo ackInfo) throws JMSException
{
try
{
+ if (closed)
+ {
+ throw new javax.jms.IllegalStateException("Cannot acknowledge " + ackInfo + " since session is closed");
+ }
+
acknowledgeInternal(ackInfo);
}
catch (Throwable t)
@@ -469,10 +479,19 @@
}
}
- public void cancelDeliveries(List ackInfos) throws JMSException
+ public synchronized void cancelDeliveries(List ackInfos) throws JMSException
{
try
{
+ if (closed)
+ {
+ //We can safely ignore any cancels since closing the session will have cancelled anything
+ //remaining
+ log.warn("Call to cancelDeliveries came in after session was closed");
+
+ return;
+ }
+
// deliveries must be cancelled in reverse order
Set consumers = new HashSet();
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-01 20:09:48 UTC (rev 1673)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-01 20:55:27 UTC (rev 1674)
@@ -1012,6 +1012,9 @@
assertEquals("hello1", rm1.getText());
cons1.close();
+
+ //Give time for asynch cancel to happen
+ Thread.sleep(500);
log.debug("sess.recover()");
More information about the jboss-cvs-commits
mailing list