[jboss-cvs] JBoss Messaging SVN: r8328 - in branches/JBMESSAGING_1878/src/main/org/jboss/jms: client/state and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 13 01:50:12 EDT 2011
Author: gaohoward
Date: 2011-06-13 01:50:11 -0400 (Mon, 13 Jun 2011)
New Revision: 8328
Modified:
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java
branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java
Log:
fix tx ack case
and also case 1 in jbm-1878
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-13 03:34:20 UTC (rev 8327)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-13 05:50:11 UTC (rev 8328)
@@ -214,7 +214,7 @@
}
DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, messageSource);
+ new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
m.incDeliveryCount();
@@ -584,7 +584,7 @@
if (!isConnectionConsumer && !ignore)
{
- final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, messageSource);
+ final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, m.getSource());
if (timeout <= 0)
{
@@ -1142,6 +1142,7 @@
//if source changed, discard it.
if (source == messageSource)
{
+ proxy.setSource(source);
//Add it to the buffer
buffer.addLast(proxy, proxy.getJMSPriority());
@@ -1151,6 +1152,10 @@
messageAdded();
}
+ else
+ {
+ log.debug("Discarding message from old source " + source + " on to new source " + messageSource);
+ }
}
}
else
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-13 03:34:20 UTC (rev 8327)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-13 05:50:11 UTC (rev 8328)
@@ -244,6 +244,14 @@
// from before failover waiting in there and we don't want them to get delivered after
// failover.
executor.clearAllExceptCurrentTask();
+
+ //this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
+ //otherwise the message can be added to buffer after buffer cleared and added to acklist.
+ //JBMESSAGING-1878
+ synchronized (ackLock)
+ {
+ ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
+ }
ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
@@ -300,12 +308,6 @@
ConnectionState connState = (ConnectionState)getParent();
ResourceManager rm = connState.getResourceManager();
-
- //this guard aginst new ack info coming in the list.
- synchronized (ackLock)
- {
- ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
- }
// We need to failover from one session ID to another in the resource manager
rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
Modified: branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java 2011-06-13 03:34:20 UTC (rev 8327)
+++ branches/JBMESSAGING_1878/src/main/org/jboss/jms/message/MessageProxy.java 2011-06-13 05:50:11 UTC (rev 8328)
@@ -95,6 +95,8 @@
protected JBossMessage message;
+
+ private Object source;
// Constructors --------------------------------------------------
@@ -506,5 +508,15 @@
needToCopyHeader = false;
}
+ public void setSource(Object source)
+ {
+ this.source = source;
+ }
+
+ public Object getSource()
+ {
+ return this.source;
+ }
+
// Inner classes -------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list