[jboss-cvs] JBoss Messaging SVN: r8400 - in branches/Branch_1_4/src/main/org/jboss/jms: client/state and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 2 02:17:38 EDT 2011
Author: gaohoward
Date: 2011-08-02 02:17:37 -0400 (Tue, 02 Aug 2011)
New Revision: 8400
Modified:
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
JBMESSAGING-1894
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-08-01 03:15:30 UTC (rev 8399)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-08-02 06:17:37 UTC (rev 8400)
@@ -147,7 +147,7 @@
}
DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, null);
+ new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck, m.getSource());
m.incDeliveryCount();
@@ -158,7 +158,11 @@
//We need to call preDeliver, deliver the message then call postDeliver - this is because
//it is legal to call session.recover(), or session.rollback() from within the onMessage()
//method in which case the last message needs to be delivered so it needs to know about it
- sess.preDeliver(deliveryInfo);
+ if (!sess.preDeliver(deliveryInfo))
+ {
+ if (trace) { log.trace("reference " + deliveryInfo + " rejected, don't call onMessage"); }
+ return;
+ }
}
try
@@ -707,7 +711,7 @@
receiverThread = null;
}
}
-
+
if (trace) { log.trace(this + " receive() returning " + m); }
return m;
@@ -742,8 +746,9 @@
buffer.addFirst(proxy, proxy.getJMSPriority());
+ if (trace) { log.trace(this + " added to the front of buffer " + proxy); }
+
consumeCount--;
-
messageAdded();
}
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-08-01 03:15:30 UTC (rev 8399)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-08-02 06:17:37 UTC (rev 8400)
@@ -303,19 +303,20 @@
String sessionId = connectionConsumerDelegate != null ?
connectionConsumerDelegate.getID() : state.getSessionID();
- if (info.getSource() != null)
+ if (connectionConsumerDelegate == null)
{
//from a normal session (non CC).
result = state.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
}
else
{
- connState.getResourceManager().addAck(txID, sessionId, info);
+ SessionState ccState = (SessionState)connectionConsumerDelegate.getState();
+ result = ccState.addAckToResourceManager(connState.getResourceManager(), txID, sessionId, info);
}
}
}
}
-
+
return Boolean.valueOf(result);
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-08-01 03:15:30 UTC (rev 8399)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-08-02 06:17:37 UTC (rev 8400)
@@ -245,6 +245,14 @@
// failover.
executor.clearAllExceptCurrentTask();
+ ConnectionState connState = (ConnectionState)getParent();
+ ResourceManager rm = connState.getResourceManager();
+
+ List ackInfos = Collections.EMPTY_LIST;
+ ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
+
+ synchronized(rm.failoverLock)
+ {
//this guard aginst new ack info coming in the list. it should be before the ClientConsumer.synchronizedWith()
//otherwise the message can be added to buffer after buffer cleared and added to acklist.
//JBMESSAGING-1878
@@ -252,8 +260,6 @@
{
ackSource = ((ConnectionState)newState.getParent()).getRemotingConnection().getCallbackManager();
}
-
- ClientSessionDelegate newDelegate = (ClientSessionDelegate)newState.getDelegate();
for (Iterator i = getChildren().iterator(); i.hasNext(); )
{
@@ -305,14 +311,9 @@
log.trace(this + " synchronized failover browser " + browserDelegate);
}
}
-
- ConnectionState connState = (ConnectionState)getParent();
- ResourceManager rm = connState.getResourceManager();
// We need to failover from one session ID to another in the resource manager
rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
-
- List ackInfos = Collections.EMPTY_LIST;
if (isCC)
{
@@ -382,6 +383,7 @@
ackInfos = rm.getDeliveriesForSession(getSessionID());
}
}
+ }
List recoveryInfos = new ArrayList();
if (!ackInfos.isEmpty())
@@ -399,14 +401,14 @@
}
log.trace(this + " sending delivery recovery " + recoveryInfos + " on failover");
-
+
//Note we only recover sessions that are transacted or client ack
if (transacted || xa || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
{
//Note! We ALWAYS call recoverDeliveries even if there are no deliveries since it also does other stuff
- //like remove from recovery Area refs corresponding to messages in client consumer buffers
+ //like remove from recovery Area refs corresponding to messages in client consumer buffers
- newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
+ newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
}
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java 2011-08-01 03:15:30 UTC (rev 8399)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java 2011-08-02 06:17:37 UTC (rev 8400)
@@ -65,11 +65,11 @@
private boolean trace = log.isTraceEnabled();
private ConcurrentHashMap transactions = new ConcurrentHashMap();
-
- private ConcurrentHashMap convertedIds = new ConcurrentHashMap();
private int serverID;
+ public Object failoverLock = new Object();
+
// Static ---------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ResourceManager.class);
@@ -152,7 +152,7 @@
public List getDeliveriesForSession(String sessionID)
{
List ackInfos = new ArrayList();
-
+
for (Iterator i = transactions.values().iterator(); i.hasNext(); )
{
ClientTransaction tx = (ClientTransaction)i.next();
@@ -161,7 +161,7 @@
ackInfos.addAll(acks);
}
-
+
return ackInfos;
}
@@ -239,6 +239,8 @@
{
if (trace) { log.trace("rolling back local xid " + xid); }
+ synchronized(failoverLock)
+ {
ClientTransaction ts = removeTxInternal(xid);
if (ts == null)
@@ -247,6 +249,7 @@
}
this.rollbackLocal(xid, ts);
+ }
}
private void rollbackLocal(Object xid, ClientTransaction ts) throws JMSException
@@ -373,8 +376,8 @@
ClientTransaction tx = removeTxInternal(xid);
if (tx != null && trace)
- {
- log.trace("got tx: " + tx + " state " + tx.getState());
+ {
+ log.trace("got tx: " + tx + " state " + tx.getState());
}
//roll back for onePhase only. for 2pc, rollback only is processed in prepare
@@ -433,14 +436,27 @@
{
if (trace) { log.trace("rolling back xid " + xid); }
- ClientTransaction tx = removeTxInternal(xid);
+ ClientTransaction tx = null;
+ synchronized(failoverLock)
+ {
+ tx = removeTxInternal(xid);
if (tx == null)
{
throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
}
- this.rollback(xid, tx, connection);
+ //only synchronize local one-phase rollback
+ if (tx.getState() != ClientTransaction.TX_PREPARED)
+ {
+ this.rollback(xid, tx, connection);
+ }
+ }
+
+ if (tx.getState() == ClientTransaction.TX_PREPARED)
+ {
+ this.rollback(xid, tx,connection);
+ }
}
private void rollback(Xid xid, ClientTransaction tx, ConnectionDelegate connection) throws XAException
More information about the jboss-cvs-commits
mailing list