[jboss-cvs] jboss-jms/src/main/org/jboss/jms/tx ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:47 EDT 2006
User: timfox
Date: 06/07/17 13:14:47
Modified: src/main/org/jboss/jms/tx AckInfo.java
MessagingXAResource.java ResourceManager.java
TxState.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.12 +20 -3 jboss-jms/src/main/org/jboss/jms/tx/AckInfo.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: AckInfo.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/AckInfo.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- AckInfo.java 25 Mar 2006 05:09:16 -0000 1.11
+++ AckInfo.java 17 Jul 2006 17:14:47 -0000 1.12
@@ -26,13 +26,15 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.jboss.jms.message.MessageProxy;
+
/**
* Struct like class for holding information regarding an acknowledgement to be passed to the server
* for processing.
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
*
- * $Id: AckInfo.java,v 1.11 2006/03/25 05:09:16 ovidiu Exp $
+ * $Id: AckInfo.java,v 1.12 2006/07/17 17:14:47 timfox Exp $
*/
public class AckInfo implements Externalizable
{
@@ -46,6 +48,9 @@
protected int consumerID;
+ //The actual proxy must not get serialized
+ protected transient MessageProxy msg;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -54,6 +59,13 @@
{
}
+ public AckInfo(MessageProxy proxy, int consumerID)
+ {
+ this.msg = proxy;
+ this.messageID = proxy.getMessage().getMessageID();
+ this.consumerID = consumerID;
+ }
+
public AckInfo(long messageID, int consumerID)
{
this.messageID = messageID;
@@ -72,6 +84,11 @@
return consumerID;
}
+ public MessageProxy getMessage()
+ {
+ return msg;
+ }
+
public String toString()
{
return "AckInfo[" + messageID + ", " + consumerID + "]";
1.4 +3 -0 jboss-jms/src/main/org/jboss/jms/tx/MessagingXAResource.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MessagingXAResource.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/MessagingXAResource.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -b -r1.3 -r1.4
--- MessagingXAResource.java 29 Apr 2006 02:43:41 -0000 1.3
+++ MessagingXAResource.java 17 Jul 2006 17:14:47 -0000 1.4
@@ -162,6 +162,9 @@
{
if (trace) { log.trace(this + " rolling back " + xid); }
+ //TODO Hmmm on rollback should we also stop and start the consumers to remove any transient messages
+ //Like we do on local session rollback??
+
rm.rollback(xid, connection);
}
1.21 +82 -2 jboss-jms/src/main/org/jboss/jms/tx/ResourceManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ResourceManager.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/ResourceManager.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -b -r1.20 -r1.21
--- ResourceManager.java 29 Apr 2006 02:43:41 -0000 1.20
+++ ResourceManager.java 17 Jul 2006 17:14:47 -0000 1.21
@@ -21,6 +21,13 @@
*/
package org.jboss.jms.tx;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -30,6 +37,7 @@
import javax.transaction.xa.Xid;
import org.jboss.jms.delegate.ConnectionDelegate;
+import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.util.MessagingXAException;
import org.jboss.logging.Logger;
@@ -47,9 +55,9 @@
*
* @author <a href="mailto:Cojonudo14 at hotmail.com">Hiram Chirino</a>
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
- * @version $Revision: 1.20 $
+ * @version $Revision: 1.21 $
*
- * $Id: ResourceManager.java,v 1.20 2006/04/29 02:43:41 ovidiu Exp $
+ * $Id: ResourceManager.java,v 1.21 2006/07/17 17:14:47 timfox Exp $
*/
public class ResourceManager
{
@@ -119,6 +127,8 @@
*
* @param xid - The id of the transaction to add the message to
* @param ackInfo Information describing the acknowledgement
+ * @param sessionState - the session the ack is in - we need this so on rollback we can tell each session
+ * to redeliver it's messages
*/
public void addAck(Object xid, AckInfo ackInfo) throws JMSException
{
@@ -170,6 +180,8 @@
new TransactionRequest(TransactionRequest.ONE_PHASE_ROLLBACK_REQUEST, null, tx);
connection.sendTransaction(request);
+
+ redeliverMessages(tx);
}
public void commit(Xid xid, boolean onePhase, ConnectionDelegate connection) throws XAException
@@ -252,8 +264,76 @@
}
sendTransactionXA(request, connection);
+
+ try
+ {
+ redeliverMessages(tx);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to redeliver", e);
+ }
+ }
+
+
+ /*
+ * Rollback has occurred so we need to redeliver any unacked messages corresponding to the acks
+ * is in the transaction
+ */
+ private void redeliverMessages(TxState tx) throws JMSException
+ {
+ Iterator iter = tx.getAcks().iterator();
+
+ //Sort them into lists - one for each session
+
+ //We use a LinkedHashMap since we need to preserve the order of the sessions
+ Map toAck = new LinkedHashMap();
+
+ while (iter.hasNext())
+ {
+ AckInfo ack = (AckInfo)iter.next();
+
+ SessionDelegate del = ack.msg.getSessionDelegate();
+
+ List acks = (List)toAck.get(del);
+
+ if (acks == null)
+ {
+ acks = new ArrayList();
+
+ toAck.put(del, acks);
+ }
+
+ acks.add(ack);
}
+ //Now tell each session to redeliver
+
+ LinkedList l = new LinkedList();
+
+ iter = toAck.entrySet().iterator();
+
+ //need to reverse the order
+ while (iter.hasNext())
+ {
+ Object entry = iter.next();
+
+ l.addFirst(entry);
+ }
+
+ iter = l.iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ SessionDelegate sess = (SessionDelegate)entry.getKey();
+
+ List acks = (List)entry.getValue();
+
+ sess.redeliver(acks);
+ }
+ }
// Protected ------------------------------------------------------
1.9 +4 -1 jboss-jms/src/main/org/jboss/jms/tx/TxState.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TxState.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/tx/TxState.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -b -r1.8 -r1.9
--- TxState.java 20 Apr 2006 20:42:27 -0000 1.8
+++ TxState.java 17 Jul 2006 17:14:47 -0000 1.9
@@ -26,9 +26,12 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
+import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.message.JBossMessage;
import org.jboss.messaging.core.message.MessageFactory;
More information about the jboss-cvs-commits
mailing list