[jboss-cvs] JBoss Messaging SVN: r8371 - in branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026: src/main/org/jboss/jms/client/container and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 23 19:39:21 EDT 2011
Author: jbertram
Date: 2011-06-23 19:39:20 -0400 (Thu, 23 Jun 2011)
New Revision: 8371
Modified:
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
SOA-3026
Property changes on: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026
___________________________________________________________________
Modified: svn:mergeinfo
- /branches/Branch_1_4:7883-7884,7887-7892,7900,7906,7912-7914,7916,7930-7934,7936-7937,7941-7944,7962-7964,7966,7968-7971,7978-7979,7996,7999,8013,8060,8083,8114,8133-8134,8138,8141-8142,8154-8155,8157-8158,8160,8233-8234,8236,8256,8312-8313,8318,8323,8356,8360-8361
/branches/JBM1842:8169-8232
+ /branches/Branch_1_4:7883-7884,7887-7892,7900,7906,7912-7914,7916,7930-7934,7936-7937,7941-7944,7962-7964,7966,7968-7971,7978-7979,7996,7999,8013,8060,8083,8114,8133-8134,8138,8141-8142,8154-8155,8157-8158,8160,8233-8234,8236,8256,8312-8313,8318,8323,8333,8356,8360-8361
/branches/JBM1842:8169-8232
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -37,6 +37,7 @@
import javax.jms.MessageListener;
import javax.jms.Session;
+import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.DefaultCancel;
@@ -145,7 +146,7 @@
}
DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+ new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, null);
m.incDeliveryCount();
@@ -213,7 +214,7 @@
}
DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+ new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
m.incDeliveryCount();
@@ -224,7 +225,10 @@
//We need to call preDeliver, deliver the message then call postDeliver - this is because
//it is legal to call session.recover(), or session.rollback() from within the onMessage()
//method in which case the last message needs to be delivered so it needs to know about it
- sess.preDeliver(deliveryInfo);
+ if (!sess.preDeliver(deliveryInfo))
+ {
+ return;
+ }
}
try
@@ -296,6 +300,17 @@
private boolean firstTime = true;
private volatile Thread onMessageThread;
private ExecutorService pool = Executors.newCachedThreadPool();
+
+ //JBMESSAGING-1878 (case 1)
+ //first the buffer need to be synchronized between
+ //failover clearing and message adding.
+ //when messageSource changes, all messages from old messageSource
+ //will be discarded as they will be redelivered.
+ //we simple take CallbackManager as the messageSource because
+ //every new connection will create a new CallbackManager
+ private Object consumerLock;
+ private Object messageSource;
+ private boolean isClustered = false;
//JBMESSAGING-1876
private long minTimeoutProcessTime;
@@ -336,6 +351,12 @@
this.shouldAck = shouldAck;
this.redeliveryDelay = redeliveryDelay;
this.minTimeoutProcessTime = minTimeoutProcessTime;
+ this.isClustered = isClustered;
+ if (isClustered)
+ {
+ consumerLock = new Object();
+ }
+ messageSource = cbManager;
}
// Public ---------------------------------------------------------------------------------------
@@ -350,7 +371,7 @@
*
* @param message The message
*/
- public void handleMessage(final Object message) throws Exception
+ public void handleMessage(final Object message, CallbackManager cbManager) throws Exception
{
ClientDelivery del = (ClientDelivery)message;
@@ -366,6 +387,8 @@
{
proxy.setJMSDestination(msg.getOriginalSuckerDestination());
}
+
+ proxy.setSource(cbManager);
//TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
// failover where a message is sent then the valve is locked, and the message send cause
@@ -561,19 +584,21 @@
if (!isConnectionConsumer && !ignore)
{
- final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
+ final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, m.getSource());
if (timeout <= 0 || sessionDelegate.getTransacted())
{
- sessionDelegate.preDeliver(info);
+ ignore = ! sessionDelegate.preDeliver(info);
// If post deliver didn't succeed and acknowledgement mode is auto_ack
// That means the ref wasn't acked since it couldn't be found.
// In order to maintain at most once semantics we must therefore not return
// the message
-
- ignore = !sessionDelegate.postDeliver();
+ if (!ignore)
+ {
+ ignore = !sessionDelegate.postDeliver();
+ }
}
else
{
@@ -582,7 +607,10 @@
{
public Boolean call() throws Exception
{
- sessionDelegate.preDeliver(info);
+ if (! sessionDelegate.preDeliver(info))
+ {
+ return true;
+ }
// If post deliver didn't succeed and acknowledgement mode is auto_ack
// That means the ref wasn't acked since it couldn't be found.
@@ -701,6 +729,9 @@
{
synchronized (mainLock)
{
+ //because this is local re-delivery, we update the source to allow to put into the Ack list again.
+ proxy.setSource(this.messageSource);
+
buffer.addFirst(proxy, proxy.getJMSPriority());
consumeCount--;
@@ -719,12 +750,19 @@
currentToken++;
consumerID = newHandler.consumerID;
+
+ //must be clustered
+ synchronized(consumerLock)
+ {
+ //this will prevent 'old' messages comes in, see JBMESSAGING-1878
+ this.messageSource = newHandler.messageSource;
- // Clear the buffer. This way the non persistent messages that managed to arrive are
- // irredeemably lost, while the persistent ones are failed-over on the server and will be
- // resent
+ // Clear the buffer. This way the non persistent messages that managed to arrive are
+ // irredeemably lost, while the persistent ones are failed-over on the server and will be
+ // resent
- buffer.clear();
+ buffer.clear();
+ }
consumeCount = 0;
}
@@ -1052,14 +1090,39 @@
proxy.getMessage().doBeforeReceive();
- //Add it to the buffer
- buffer.addLast(proxy, proxy.getJMSPriority());
+ if (isClustered)
+ {
+ synchronized (consumerLock)
+ {
+ //if source changed, discard it.
+ if (proxy.getSource() == messageSource)
+ {
+ //Add it to the buffer
+ buffer.addLast(proxy, proxy.getJMSPriority());
- lastDeliveryId = proxy.getDeliveryId();
+ lastDeliveryId = proxy.getDeliveryId();
+
+ if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+ messageAdded();
+ }
+ else
+ {
+ log.debug("Discarding message from old source " + proxy.getSource() + " on to new source " + messageSource);
+ }
+ }
+ }
+ else
+ {
+ //Add it to the buffer
+ buffer.addLast(proxy, proxy.getJMSPriority());
+
+ lastDeliveryId = proxy.getDeliveryId();
- if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+ if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
- messageAdded();
+ messageAdded();
+ }
}
}
catch (Exception e)
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -27,6 +27,7 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.client.state.ConnectionState;
@@ -89,6 +90,8 @@
int maxDeliveries = consumerState.getMaxDeliveries();
long redeliveryDelay = consumerState.getRedeliveryDelay();
+ FailoverCommandCenter fcc = connectionState.getFailoverCommandCenter();
+
//We need the queue name for recovering any deliveries after failover
String queueName = null;
if (consumerState.getSubscriptionName() != null)
@@ -109,6 +112,8 @@
boolean autoFlowControl = ((Boolean)mi.getArguments()[5]).booleanValue();
+ CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+
ClientConsumer messageHandler =
new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
sessionDelegate, consumerDelegate, consumerID, queueName,
@@ -118,7 +123,6 @@
sessionState.addCallbackHandler(messageHandler);
- CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.registerHandler(consumerID, messageHandler);
consumerState.setClientConsumer(messageHandler);
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -234,6 +234,8 @@
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
+ boolean result = true;
+
synchronized (state)
{
@@ -254,8 +256,9 @@
throw new IllegalStateException(
"CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
}
-
- state.getClientAckList().add(info);
+
+ result = state.addToClientAckList(info);
+
}
// if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
// However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
@@ -300,12 +303,20 @@
String sessionId = connectionConsumerDelegate != null ?
connectionConsumerDelegate.getID() : state.getSessionID();
- connState.getResourceManager().addAck(txID, sessionId, info);
+ if (info.getSource() != null)
+ {
+ //from a normal session (non CC).
+ result = state.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
+ }
+ else
+ {
+ connState.getResourceManager().addAck(txID, sessionId, info);
+ }
}
}
}
- return null;
+ return Boolean.valueOf(result);
}
public Object handlePostDeliver(Invocation invocation) throws Throwable
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -382,7 +382,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void preDeliver(DeliveryInfo deliveryInfo) throws JMSException
+ public boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -97,7 +97,7 @@
try
{
- handler.handleMessage(dr);
+ handler.handleMessage(dr, this);
}
catch (Exception e)
{
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.Executors;
+import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -128,6 +129,11 @@
private boolean isCC;
+ private Object ackLock = new Object();
+
+ private Object ackSource;
+
+
// Constructors ---------------------------------------------------------------------------------
public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -172,6 +178,8 @@
this.setEnableOrderingGroup(enableOrderingGroup);
this.setDefaultOrderingGroupName(defaultOrderingGroupName);
+
+ this.ackSource = parent.getRemotingConnection().getCallbackManager();
}
// HierarchicalState implementation -------------------------------------------------------------
@@ -236,6 +244,14 @@
// from before failover waiting in there and we don't want them to get delivered after
// failover.
executor.clearAllExceptCurrentTask();
+
+ //this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
+ //otherwise the message can be added to buffer after buffer cleared and added to acklist.
+ //JBMESSAGING-1878
+ synchronized (ackLock)
+ {
+ ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
+ }
ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
@@ -292,7 +308,7 @@
ConnectionState connState = (ConnectionState)getParent();
ResourceManager rm = connState.getResourceManager();
-
+
// We need to failover from one session ID to another in the resource manager
rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
@@ -407,6 +423,23 @@
return clientAckList;
}
+ public boolean addToClientAckList(DeliveryInfo info)
+ {
+ synchronized (ackLock)
+ {
+ if (ackSource == info.getSource())
+ {
+ clientAckList.add(info);
+ return true;
+ }
+ else
+ {
+ log.debug("Rejecting ack " + info + " from old source: " + info.getSource() + " on new source " + ackSource);
+ return false;
+ }
+ }
+ }
+
public void setClientAckList(List list)
{
this.clientAckList = list;
@@ -580,5 +613,22 @@
return parent.getMinTimeoutProcessTime();
}
+ public boolean addAckToResourceManager(ResourceManager rm, Object txID, String sessId, DeliveryInfo info) throws JMSException
+ {
+ synchronized (ackLock)
+ {
+ if (ackSource == info.getSource())
+ {
+ rm.addAck(txID, sessId, info);
+ return true;
+ }
+ else
+ {
+ log.debug("Rejecting tx ack " + info + " from old source " + info.getSource() + " on new source " + ackSource);
+ return false;
+ }
+ }
+ }
+
}
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/DeliveryInfo.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -58,12 +58,15 @@
//to the connection consumer's session, otherwise it will be null
private SessionDelegate connectionConsumerSession;
+ //mark where the msg is delivered from (a CallbackManager)
+ private Object source;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public DeliveryInfo(MessageProxy msg, String consumerId, String queueName,
- SessionDelegate connectionConsumerSession, boolean shouldAck)
+ SessionDelegate connectionConsumerSession, boolean shouldAck, Object source)
{
this.msg = msg;
@@ -74,6 +77,8 @@
this.connectionConsumerSession = connectionConsumerSession;
this.shouldAck = shouldAck;
+
+ this.source = source;
}
// Public --------------------------------------------------------
@@ -103,6 +108,11 @@
return shouldAck;
}
+ public Object getSource()
+ {
+ return source;
+ }
+
public String toString()
{
return "Delivery[" + getDeliveryID() + ", " + msg + "]";
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -64,7 +64,7 @@
TextMessageProxy createTextMessage(String text) throws JMSException;
- void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
+ boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
boolean postDeliver() throws JMSException;
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/message/MessageProxy.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -95,6 +95,8 @@
protected JBossMessage message;
+
+ private Object source;
// Constructors --------------------------------------------------
@@ -506,5 +508,15 @@
needToCopyHeader = false;
}
+ public void setSource(Object source)
+ {
+ this.source = source;
+ }
+
+ public Object getSource()
+ {
+ return this.source;
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -131,7 +131,7 @@
boolean strictTck,
boolean sendAcksAsync,
boolean enableOrderingGroup,
- String defaultOrderingGroupName)
+ String defaultOrderingGroupName,
long minTimeoutProcessTime)
throws Exception
{
Modified: branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2011-06-23 21:33:48 UTC (rev 8370)
+++ branches/Branch_JBossMessaging_1_4_6_GA_SOA-3026/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2011-06-23 23:39:20 UTC (rev 8371)
@@ -643,7 +643,7 @@
{
assertRemainingMessages(NUM_MESSAGES - i);
- m = consumer.receive(200);
+ m = consumer.receive(2000);
assertRemainingMessages(NUM_MESSAGES - (i + 1));
@@ -663,7 +663,7 @@
log.trace("Session recover called");
- m = consumer.receive(200);
+ m = consumer.receive(2000);
log.trace("Message is:" + m);
@@ -755,7 +755,7 @@
Message m = null;
for (int i = 0; i < 10; i++)
{
- m = consumer.receive(200);
+ m = consumer.receive(2000);
assertNotNull(m);
@@ -771,7 +771,7 @@
for (int i = 0; i < 9; i++)
{
- m = consumer.receive(200);
+ m = consumer.receive(2000);
assertNotNull(m);
More information about the jboss-cvs-commits
mailing list