[jboss-cvs] JBoss Messaging SVN: r8327 - in branches/JBMESSAGING_1878/src/main/org/jboss/jms: client/delegate and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Jun 12 23:34:20 EDT 2011
Author: gaohoward
Date: 2011-06-12 23:34:20 -0400 (Sun, 12 Jun 2011)
New Revision: 8327
Modified:
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/remoting/CallbackManager.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/DeliveryInfo.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/SessionDelegate.java
Log:
Fixed the client_ack case
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -37,6 +37,7 @@
import javax.jms.MessageListener;
import javax.jms.Session;
+import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.DefaultCancel;
@@ -145,7 +146,7 @@
}
DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+ new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, null);
m.incDeliveryCount();
@@ -213,7 +214,7 @@
}
DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+ new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, messageSource);
m.incDeliveryCount();
@@ -224,7 +225,10 @@
//We need to call preDeliver, deliver the message then call postDeliver - this is because
//it is legal to call session.recover(), or session.rollback() from within the onMessage()
//method in which case the last message needs to be delivered so it needs to know about it
- sess.preDeliver(deliveryInfo);
+ if (!sess.preDeliver(deliveryInfo))
+ {
+ return;
+ }
}
try
@@ -299,6 +303,17 @@
private long maxRetryChangeRate;
private long retryChangeRateInterval;
private boolean abortReceive;
+
+ //JBMESSAGING-1878 (case 1)
+ //first the buffer need to be synchronized between
+ //failover clearing and message adding.
+ //when messageSource changes, all messages from old messageSource
+ //will be discarded as they will be redelivered.
+ //we simple take CallbackManager as the messageSource because
+ //every new connection will create a new CallbackManager
+ private Object consumerLock;
+ private Object messageSource;
+ private boolean isClustered = false;
public int getBufferSize()
{
@@ -314,7 +329,9 @@
int maxDeliveries, boolean shouldAck,
long redeliveryDelay,
long maxRetryChangeRate,
- long retryChangeRateInterval)
+ long retryChangeRateInterval,
+ boolean isClustered,
+ CallbackManager cbManager)
{
if (bufferSize < 1)
{
@@ -336,6 +353,12 @@
this.redeliveryDelay = redeliveryDelay;
this.maxRetryChangeRate = maxRetryChangeRate;
this.retryChangeRateInterval = retryChangeRateInterval;
+ this.isClustered = isClustered;
+ if (isClustered)
+ {
+ consumerLock = new Object();
+ }
+ messageSource = cbManager;
}
// Public ---------------------------------------------------------------------------------------
@@ -350,7 +373,7 @@
*
* @param message The message
*/
- public void handleMessage(final Object message) throws Exception
+ public void handleMessage(final Object message, CallbackManager cbManager) throws Exception
{
ClientDelivery del = (ClientDelivery)message;
@@ -372,7 +395,7 @@
// a message delivery back to the same client which tries to ack but can't get through
// the valve. This won't be necessary when we move to a non blocking transport
- sessionExecutor.execute(new HandleMessageRunnable(currentToken, proxy));
+ sessionExecutor.execute(new HandleMessageRunnable(currentToken, proxy, cbManager));
}
public void setMessageListener(MessageListener listener) throws JMSException
@@ -561,19 +584,21 @@
if (!isConnectionConsumer && !ignore)
{
- final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck);
+ final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, messageSource);
if (timeout <= 0)
{
- sessionDelegate.preDeliver(info);
+ ignore = ! sessionDelegate.preDeliver(info);
// If post deliver didn't succeed and acknowledgement mode is auto_ack
// That means the ref wasn't acked since it couldn't be found.
// In order to maintain at most once semantics we must therefore not return
// the message
-
- ignore = !sessionDelegate.postDeliver();
+ if (!ignore)
+ {
+ ignore = !sessionDelegate.postDeliver();
+ }
}
else
{
@@ -582,7 +607,10 @@
{
public Boolean call() throws Exception
{
- sessionDelegate.preDeliver(info);
+ if (! sessionDelegate.preDeliver(info))
+ {
+ return true;
+ }
// If post deliver didn't succeed and acknowledgement mode is auto_ack
// That means the ref wasn't acked since it couldn't be found.
@@ -716,12 +744,19 @@
currentToken++;
consumerID = newHandler.consumerID;
+
+ //must be clustered
+ synchronized(consumerLock)
+ {
+ //this will prevent 'old' messages comes in, see JBMESSAGING-1878
+ this.messageSource = newHandler.messageSource;
- // Clear the buffer. This way the non persistent messages that managed to arrive are
- // irredeemably lost, while the persistent ones are failed-over on the server and will be
- // resent
+ // Clear the buffer. This way the non persistent messages that managed to arrive are
+ // irredeemably lost, while the persistent ones are failed-over on the server and will be
+ // resent
- buffer.clear();
+ buffer.clear();
+ }
consumeCount = 0;
}
@@ -1063,11 +1098,15 @@
private MessageProxy proxy;
- HandleMessageRunnable(int token, MessageProxy proxy)
+ private Object source;
+
+ HandleMessageRunnable(int token, MessageProxy proxy, Object source)
{
this.token = token;
this.proxy = proxy;
+
+ this.source = source;
}
public void run()
@@ -1096,14 +1135,35 @@
proxy.getMessage().doBeforeReceive();
- //Add it to the buffer
- buffer.addLast(proxy, proxy.getJMSPriority());
+ if (isClustered)
+ {
+ synchronized (consumerLock)
+ {
+ //if source changed, discard it.
+ if (source == messageSource)
+ {
+ //Add it to the buffer
+ buffer.addLast(proxy, proxy.getJMSPriority());
- lastDeliveryId = proxy.getDeliveryId();
+ lastDeliveryId = proxy.getDeliveryId();
+
+ if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+
+ messageAdded();
+ }
+ }
+ }
+ else
+ {
+ //Add it to the buffer
+ buffer.addLast(proxy, proxy.getJMSPriority());
+
+ lastDeliveryId = proxy.getDeliveryId();
- if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
+ if (trace) { log.trace(this + " added message(s) to the buffer are now " + buffer.size() + " messages"); }
- messageAdded();
+ messageAdded();
+ }
}
}
catch (Exception e)
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -27,6 +27,7 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.client.state.ConnectionState;
@@ -89,6 +90,8 @@
int maxDeliveries = consumerState.getMaxDeliveries();
long redeliveryDelay = consumerState.getRedeliveryDelay();
+ FailoverCommandCenter fcc = connectionState.getFailoverCommandCenter();
+
//We need the queue name for recovering any deliveries after failover
String queueName = null;
if (consumerState.getSubscriptionName() != null)
@@ -109,16 +112,18 @@
boolean autoFlowControl = ((Boolean)mi.getArguments()[5]).booleanValue();
+ CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
+
ClientConsumer messageHandler =
new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
sessionDelegate, consumerDelegate, consumerID, queueName,
prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
redeliveryDelay, consumerState.getMaxRetryChangeRate(),
- consumerState.getRetryChangeRateInterval());
+ consumerState.getRetryChangeRateInterval(),
+ fcc != null, cm);
sessionState.addCallbackHandler(messageHandler);
- CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.registerHandler(consumerID, messageHandler);
consumerState.setClientConsumer(messageHandler);
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -234,6 +234,8 @@
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
+ boolean result = true;
+
synchronized (state)
{
@@ -254,8 +256,9 @@
throw new IllegalStateException(
"CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
}
-
- state.getClientAckList().add(info);
+
+ result = state.addToClientAckList(info);
+
}
// if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
// However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
@@ -300,12 +303,20 @@
String sessionId = connectionConsumerDelegate != null ?
connectionConsumerDelegate.getID() : state.getSessionID();
- connState.getResourceManager().addAck(txID, sessionId, info);
+ if (info.getSource() != null)
+ {
+ //from a normal session (non CC).
+ result = state.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
+ }
+ else
+ {
+ connState.getResourceManager().addAck(txID, sessionId, info);
+ }
}
}
}
- return null;
+ return Boolean.valueOf(result);
}
public Object handlePostDeliver(Invocation invocation) throws Throwable
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -382,7 +382,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void preDeliver(DeliveryInfo deliveryInfo) throws JMSException
+ public boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -97,7 +97,7 @@
try
{
- handler.handleMessage(dr);
+ handler.handleMessage(dr, this);
}
catch (Exception e)
{
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.Executors;
+import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -128,6 +129,11 @@
private boolean isCC;
+ private Object ackLock = new Object();
+
+ private Object ackSource;
+
+
// Constructors ---------------------------------------------------------------------------------
public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -172,6 +178,8 @@
this.setEnableOrderingGroup(enableOrderingGroup);
this.setDefaultOrderingGroupName(defaultOrderingGroupName);
+
+ this.ackSource = parent.getRemotingConnection().getCallbackManager();
}
// HierarchicalState implementation -------------------------------------------------------------
@@ -293,6 +301,12 @@
ConnectionState connState = (ConnectionState)getParent();
ResourceManager rm = connState.getResourceManager();
+ //this guard aginst new ack info coming in the list.
+ synchronized (ackLock)
+ {
+ ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
+ }
+
// We need to failover from one session ID to another in the resource manager
rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
@@ -402,6 +416,23 @@
return clientAckList;
}
+ public boolean addToClientAckList(DeliveryInfo info)
+ {
+ synchronized (ackLock)
+ {
+ if (ackSource == info.getSource())
+ {
+ clientAckList.add(info);
+ return true;
+ }
+ else
+ {
+ log.debug("Rejecting ack " + info + " from old source: " + info.getSource() + " on new source " + ackSource);
+ return false;
+ }
+ }
+ }
+
public void setClientAckList(List list)
{
this.clientAckList = list;
@@ -596,5 +627,22 @@
}
}
+ public boolean addAckToResourceManager(ResourceManager rm, Object txID, String sessId, DeliveryInfo info) throws JMSException
+ {
+ synchronized (ackLock)
+ {
+ if (ackSource == info.getSource())
+ {
+ rm.addAck(txID, sessId, info);
+ return true;
+ }
+ else
+ {
+ log.debug("Rejecting tx ack " + info + " from old source " + info.getSource() + " on new source " + ackSource);
+ return false;
+ }
+ }
+ }
+
}
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/DeliveryInfo.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/DeliveryInfo.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/DeliveryInfo.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -58,12 +58,15 @@
//to the connection consumer's session, otherwise it will be null
private SessionDelegate connectionConsumerSession;
+ //mark where the msg is delivered from (a CallbackManager)
+ private Object source;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public DeliveryInfo(MessageProxy msg, String consumerId, String queueName,
- SessionDelegate connectionConsumerSession, boolean shouldAck)
+ SessionDelegate connectionConsumerSession, boolean shouldAck, Object source)
{
this.msg = msg;
@@ -74,6 +77,8 @@
this.connectionConsumerSession = connectionConsumerSession;
this.shouldAck = shouldAck;
+
+ this.source = source;
}
// Public --------------------------------------------------------
@@ -103,6 +108,11 @@
return shouldAck;
}
+ public Object getSource()
+ {
+ return source;
+ }
+
public String toString()
{
return "Delivery[" + getDeliveryID() + ", " + msg + "]";
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-13 02:12:19 UTC (rev 8326)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-13 03:34:20 UTC (rev 8327)
@@ -64,7 +64,7 @@
TextMessageProxy createTextMessage(String text) throws JMSException;
- void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
+ boolean preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
boolean postDeliver() throws JMSException;
More information about the jboss-cvs-commits
mailing list