[jboss-cvs] JBoss Messaging SVN: r3235 - in trunk: src/main/org/jboss/jms/server/endpoint and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Oct 21 09:46:18 EDT 2007
Author: timfox
Date: 2007-10-21 09:46:15 -0400 (Sun, 21 Oct 2007)
New Revision: 3235
Modified:
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
Log:
Only replicate transacted or client ack sessions
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-10-21 13:46:15 UTC (rev 3235)
@@ -285,12 +285,12 @@
// to AUTO_ACKNOWLEDGE)
log.trace(this + " is not transacted (or XA with no transaction set), " +
- "retrieving deliveries from session state");
+ "retrieving deliveries from session state");
// We remove any unacked non-persistent messages - this is because we don't want to ack
// them since the server won't know about them and will get confused
- if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
{
for(Iterator i = getClientAckList().iterator(); i.hasNext(); )
{
@@ -352,7 +352,12 @@
//like remove from recovery Area refs corresponding to messages in client consumer buffers
log.trace(this + " sending delivery recovery " + recoveryInfos + " on failover");
- newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
+
+ //Note we only recover sessions that are transacted or client ack
+ if (transacted || xa || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
+ }
}
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-10-21 13:46:15 UTC (rev 3235)
@@ -33,6 +33,7 @@
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
+import javax.jms.Session;
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
@@ -239,8 +240,11 @@
// create the corresponding server-side session endpoint and register it with this
// connection endpoint instance
- ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this);
+ //Note we only replicate transacted and client acknowledge sessions.
+ ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this,
+ transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE);
+
synchronized (sessions)
{
sessions.put(sessionID, ep);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-21 13:46:15 UTC (rev 3235)
@@ -162,6 +162,7 @@
private Queue defaultDLQ;
private Queue defaultExpiryQueue;
private boolean supportsFailover;
+ private boolean replicating;
private Object deliveryLock = new Object();
@@ -181,12 +182,15 @@
// Constructors ---------------------------------------------------------------------------------
- ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
+ ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
+ boolean replicating) throws Exception
{
this.id = sessionID;
this.connectionEndpoint = connectionEndpoint;
+ this.replicating = replicating;
+
callbackHandler = connectionEndpoint.getCallbackHandler();
sp = connectionEndpoint.getServerPeer();
@@ -465,8 +469,11 @@
{
Delivery del = cancelDeliveryInternal(cancel);
- //Prompt delivery
- promptDelivery((Channel)del.getObserver());
+ if (del != null)
+ {
+ //Prompt delivery
+ promptDelivery((Channel)del.getObserver());
+ }
}
catch (Throwable t)
{
@@ -492,7 +499,10 @@
Delivery del = cancelDeliveryInternal(cancel);
- channels.add(del.getObserver());
+ if (del != null)
+ {
+ channels.add(del.getObserver());
+ }
}
if (trace) { log.trace("Cancelled deliveries"); }
@@ -920,7 +930,7 @@
boolean gotSome = false;
- if (!firstNode)
+ if (!firstNode && replicating)
{
if (trace) { log.trace("Now collecting"); }
@@ -1348,7 +1358,9 @@
Message message = delivery.getReference().getMessage();
- if (!consumer.isReplicating())
+ //Note that we only replicate transacted or client acknowledge sessions
+ //There is no point in replicating AUTO_ACK or DUPS_OK
+ if (!consumer.isReplicating() || !replicating)
{
if (trace) { log.trace(this + " doing the delivery straight away"); }
@@ -1588,7 +1600,13 @@
if (rec == null)
{
- throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
+ //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
+ //and has failed over since recoverDeliveries won't have been called
+ if (trace)
+ {
+ log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
+ }
+ return null;
}
//Note we check the flag *and* evaluate again, this is because the server and client clocks may
@@ -1730,7 +1748,10 @@
if (rec == null)
{
- log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before ");
+ log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
+
+ log.info("**** CANNOT FIND ACK TO ACKNOWLEDGED");
+
return;
}
@@ -1738,7 +1759,7 @@
//Now replicate the ack
- if (rec.replicating)
+ if (rec.replicating && replicating)
{
postOffice.sendReplicateAckMessage(rec.queueName, rec.del.getReference().getMessage().getMessageID());
}
Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-10-21 11:51:18 UTC (rev 3234)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java 2007-10-21 13:46:15 UTC (rev 3235)
@@ -148,7 +148,21 @@
Integer messageCount = (Integer)ServerManagement.getServer(server).getAttribute(destObjectName, "MessageCount");
- assertEquals(0, messageCount.intValue());
+ if (messageCount.intValue() != 0)
+ {
+ //Now delete it - prevents other tests from failing
+ try
+ {
+ this.removeAllMessages(queue.getQueueName(), true, server);
+ }
+ catch (Exception e)
+ {
+ log.debug("Failed to remove all messages", e);
+ }
+
+ fail("Queue " + queue.getQueueName() + " is not empty");
+ }
+
}
protected void checkEmpty(Topic topic) throws Exception
More information about the jboss-cvs-commits
mailing list