[jboss-cvs] JBoss Messaging SVN: r2677 - in trunk: src/main/org/jboss/jms/client/tx and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon May 14 19:05:48 EDT 2007
Author: timfox
Date: 2007-05-14 19:05:48 -0400 (Mon, 14 May 2007)
New Revision: 2677
Added:
trunk/src/main/org/jboss/jms/server/recovery/
trunk/src/main/org/jboss/jms/shared/tx/ClientTransaction.java
trunk/src/main/org/jboss/jms/tx/
Removed:
trunk/src/main/org/jboss/jms/client/tx/ClientTransaction.java
trunk/src/main/org/jboss/jms/recovery/
Modified:
trunk/src/main/org/jboss/jms/client/tx/ResourceManager.java
trunk/src/main/org/jboss/jms/client/tx/TransactionRequest.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
Separate server and client classes step2
Deleted: trunk/src/main/org/jboss/jms/client/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/tx/ClientTransaction.java 2007-05-14 22:55:42 UTC (rev 2676)
+++ trunk/src/main/org/jboss/jms/client/tx/ClientTransaction.java 2007-05-14 23:05:48 UTC (rev 2677)
@@ -1,433 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.tx;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.server.endpoint.Ack;
-import org.jboss.jms.server.endpoint.DefaultAck;
-import org.jboss.jms.server.endpoint.DeliveryInfo;
-import org.jboss.messaging.core.message.MessageFactory;
-import org.jboss.logging.Logger;
-
-/**
- * Holds the state of a transaction on the client side
- *
- * @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
- */
-public class ClientTransaction
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ClientTransaction.class);
-
- public final static byte TX_OPEN = 0;
- public final static byte TX_ENDED = 1;
- public final static byte TX_PREPARED = 2;
- public final static byte TX_COMMITED = 3;
- public final static byte TX_ROLLEDBACK = 4;
-
- private static boolean trace = log.isTraceEnabled();
-
- // Attributes ----------------------------------------------------
-
- private byte state = TX_OPEN;
-
- // Map<Integer(sessionID) - SessionTxState> maintained on the client side
- private Map sessionStatesMap;
-
- // Read from on the server side
- private List sessionStatesList;
-
- private boolean clientSide;
-
- private boolean hasPersistentAcks;
-
- private boolean failedOver;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ClientTransaction()
- {
- clientSide = true;
- }
-
- // Public --------------------------------------------------------
-
- public byte getState()
- {
- return state;
- }
-
- public void addMessage(int sessionId, JBossMessage msg)
- {
- if (!clientSide)
- {
- throw new IllegalStateException("Cannot call this method on the server side");
- }
- SessionTxState sessionTxState = getSessionTxState(sessionId);
-
- sessionTxState.addMessage(msg);
- }
-
- public void addAck(int sessionId, DeliveryInfo info)
- {
- if (!clientSide)
- {
- throw new IllegalStateException("Cannot call this method on the server side");
- }
- SessionTxState sessionTxState = getSessionTxState(sessionId);
-
- sessionTxState.addAck(info);
-
- if (info.getMessageProxy().getMessage().isReliable())
- {
- hasPersistentAcks = true;
- }
- }
-
- public boolean hasPersistentAcks()
- {
- return hasPersistentAcks;
- }
-
- public boolean isFailedOver()
- {
- return failedOver;
- }
-
- public void clearMessages()
- {
- if (!clientSide)
- {
- throw new IllegalStateException("Cannot call this method on the server side");
- }
-
- if (sessionStatesMap != null)
- {
- // This can be null if the tx was recreated on the client side due to recovery
-
- for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
- {
- SessionTxState sessionTxState = (SessionTxState)i.next();
- sessionTxState.clearMessages();
- }
- }
- }
-
- public void setState(byte state)
- {
- if (!clientSide)
- {
- throw new IllegalStateException("Cannot call this method on the server side");
- }
- this.state = state;
- }
-
- public List getSessionStates()
- {
- if (sessionStatesList != null)
- {
- return sessionStatesList;
- }
- else
- {
- return sessionStatesMap == null ?
- Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
- }
- }
-
- /*
- * Substitute newSessionID for oldSessionID
- */
- public void handleFailover(int newServerID, int oldSessionID, int newSessionID)
- {
- if (!clientSide)
- {
- throw new IllegalStateException("Cannot call this method on the server side");
- }
-
- // Note we have to do this in one go since there may be overlap between old and new session
- // IDs and we don't want to overwrite keys in the map.
-
- Map tmpMap = null;
-
- if (sessionStatesMap != null)
- {
- for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
- {
-
- SessionTxState state = (SessionTxState)i.next();
- state.handleFailover(newServerID, oldSessionID, newSessionID);
-
- if (tmpMap == null)
- {
- tmpMap = new LinkedHashMap();
- }
- tmpMap.put(new Integer(newSessionID), state);
- }
- }
-
- if (tmpMap != null)
- {
- // swap
- sessionStatesMap = tmpMap;
- }
-
- failedOver = true;
- }
-
- /**
- * May return an empty list, but never null.
- */
- public List getDeliveriesForSession(int sessionID)
- {
- if (!clientSide)
- {
- throw new IllegalStateException("Cannot call this method on the server side");
- }
-
- if (sessionStatesMap == null)
- {
- return Collections.EMPTY_LIST;
- }
- else
- {
- SessionTxState state = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
-
- if (state != null)
- {
- return state.getAcks();
- }
- else
- {
- return Collections.EMPTY_LIST;
- }
- }
- }
-
- // Streamable implementation ---------------------------------
-
- public void write(DataOutputStream out) throws Exception
- {
- out.writeByte(state);
-
- if (sessionStatesMap == null)
- {
- out.writeInt(0);
- }
- else
- {
- out.writeInt(sessionStatesMap.size());
-
- Iterator iter = sessionStatesMap.values().iterator();
-
- while (iter.hasNext())
- {
- SessionTxState state = (SessionTxState)iter.next();
-
- out.writeInt(state.getSessionId());
-
- List msgs = state.getMsgs();
-
- out.writeInt(msgs.size());
-
- Iterator iter2 = msgs.iterator();
-
- while (iter2.hasNext())
- {
- JBossMessage m = (JBossMessage)iter2.next();
-
- out.writeByte(m.getType());
-
- m.write(out);
- }
-
- List acks = state.getAcks();
-
- out.writeInt(acks.size());
-
- iter2 = acks.iterator();
-
- while (iter2.hasNext())
- {
- DeliveryInfo ack = (DeliveryInfo)iter2.next();
-
- //We only need the delivery id written
- out.writeLong(ack.getMessageProxy().getDeliveryId());
- }
- }
- }
- }
-
-
- public void read(DataInputStream in) throws Exception
- {
- clientSide = false;
-
- state = in.readByte();
-
- int numSessions = in.readInt();
-
- //Read in as a list since we don't want the extra overhead of putting into a map
- //which won't be used on the server side
- sessionStatesList = new ArrayList(numSessions);
-
- for (int i = 0; i < numSessions; i++)
- {
- int sessionId = in.readInt();
-
- SessionTxState sessionState = new SessionTxState(sessionId);
-
- sessionStatesList.add(sessionState);
-
- int numMsgs = in.readInt();
-
- for (int j = 0; j < numMsgs; j++)
- {
- byte type = in.readByte();
-
- JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
-
- msg.read(in);
-
- sessionState.addMessage(msg);
- }
-
- int numAcks = in.readInt();
-
- for (int j = 0; j < numAcks; j++)
- {
- long ack = in.readLong();
-
- sessionState.addAck(new DefaultAck(ack));
- }
- }
- }
-
- // Protected -----------------------------------------------------
-
- // Package Private -----------------------------------------------
-
- // Private -------------------------------------------------------
-
- private SessionTxState getSessionTxState(int sessionID)
- {
- if (sessionStatesMap == null)
- {
- sessionStatesMap = new LinkedHashMap();
- }
-
- SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
-
- if (sessionTxState == null)
- {
- sessionTxState = new SessionTxState(sessionID);
- sessionStatesMap.put(new Integer(sessionID), sessionTxState);
- }
-
- return sessionTxState;
- }
-
- // Inner Classes -------------------------------------------------
-
- public class SessionTxState
- {
- private int sessionID;
-
- // We record the server id when doing failover to avoid overwriting the sesion ID again if
- // multiple connections fail on the same resource mamanger but fail onto old values of the
- // session ID. This prevents the ID being failed over more than once for the same server.
- private int serverID = -1;
-
- private List msgs = new ArrayList();
- private List acks = new ArrayList();
-
- SessionTxState(int sessionID)
- {
- this.sessionID = sessionID;
- }
-
- void addMessage(JBossMessage msg)
- {
- msgs.add(msg);
- }
-
- void addAck(Ack ack)
- {
- acks.add(ack);
- }
-
- public List getMsgs()
- {
- return msgs;
- }
-
- public List getAcks()
- {
- return acks;
- }
-
- public int getSessionId()
- {
- return sessionID;
- }
-
- void handleFailover(int newServerID, int oldSessionID, int newSessionID)
- {
- if (sessionID == oldSessionID && serverID != newServerID)
- {
- sessionID = newSessionID;
- serverID = newServerID;
-
- // Remove any non persistent acks
- for(Iterator i = acks.iterator(); i.hasNext(); )
- {
- DeliveryInfo di = (DeliveryInfo)i.next();
-
- if (!di.getMessageProxy().getMessage().isReliable())
- {
- if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
- i.remove();
- }
- }
- }
- }
-
- void clearMessages()
- {
- msgs.clear();
- }
-
- }
-
-}
Modified: trunk/src/main/org/jboss/jms/client/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/tx/ResourceManager.java 2007-05-14 22:55:42 UTC (rev 2676)
+++ trunk/src/main/org/jboss/jms/client/tx/ResourceManager.java 2007-05-14 23:05:48 UTC (rev 2677)
@@ -38,7 +38,8 @@
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.server.endpoint.DeliveryInfo;
-import org.jboss.jms.client.tx.ClientTransaction.SessionTxState;
+import org.jboss.jms.shared.tx.ClientTransaction;
+import org.jboss.jms.shared.tx.ClientTransaction.SessionTxState;
import org.jboss.jms.util.MessagingTransactionRolledBackException;
import org.jboss.jms.util.MessagingXAException;
import org.jboss.logging.Logger;
Modified: trunk/src/main/org/jboss/jms/client/tx/TransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/tx/TransactionRequest.java 2007-05-14 22:55:42 UTC (rev 2676)
+++ trunk/src/main/org/jboss/jms/client/tx/TransactionRequest.java 2007-05-14 23:05:48 UTC (rev 2677)
@@ -29,6 +29,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.tx.MessagingXid;
import org.jboss.messaging.util.Streamable;
+import org.jboss.jms.shared.tx.ClientTransaction;
/**
* This class contains all the data needed to perform a JMS transaction.
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-05-14 22:55:42 UTC (rev 2676)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-05-14 23:05:48 UTC (rev 2677)
@@ -35,9 +35,7 @@
import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.remoting.CallbackManager;
-import org.jboss.jms.client.tx.ClientTransaction;
import org.jboss.jms.client.tx.TransactionRequest;
-import org.jboss.jms.client.tx.ClientTransaction.SessionTxState;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
@@ -48,6 +46,8 @@
import org.jboss.jms.server.endpoint.advised.SessionAdvised;
import org.jboss.jms.server.messagecounter.MessageCounter;
import org.jboss.jms.server.remoting.JMSWireFormat;
+import org.jboss.jms.shared.tx.ClientTransaction;
+import org.jboss.jms.shared.tx.ClientTransaction.SessionTxState;
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.jms.util.ToString;
import org.jboss.jms.wireformat.Dispatcher;
Copied: trunk/src/main/org/jboss/jms/server/recovery (from rev 2670, trunk/src/main/org/jboss/jms/recovery)
Modified: trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery.java
===================================================================
--- trunk/src/main/org/jboss/jms/recovery/MessagingXAResourceRecovery.java 2007-05-12 14:43:53 UTC (rev 2670)
+++ trunk/src/main/org/jboss/jms/server/recovery/MessagingXAResourceRecovery.java 2007-05-14 23:05:48 UTC (rev 2677)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.jms.recovery;
+package org.jboss.jms.server.recovery;
import java.util.StringTokenizer;
Copied: trunk/src/main/org/jboss/jms/shared/tx/ClientTransaction.java (from rev 2676, trunk/src/main/org/jboss/jms/client/tx/ClientTransaction.java)
===================================================================
--- trunk/src/main/org/jboss/jms/shared/tx/ClientTransaction.java (rev 0)
+++ trunk/src/main/org/jboss/jms/shared/tx/ClientTransaction.java 2007-05-14 23:05:48 UTC (rev 2677)
@@ -0,0 +1,433 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.shared.tx;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.endpoint.Ack;
+import org.jboss.jms.server.endpoint.DefaultAck;
+import org.jboss.jms.server.endpoint.DeliveryInfo;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.message.MessageFactory;
+
+/**
+ * Holds the state of a transaction on the client side
+ *
+ * @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
+ */
+public class ClientTransaction
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ClientTransaction.class);
+
+ public final static byte TX_OPEN = 0;
+ public final static byte TX_ENDED = 1;
+ public final static byte TX_PREPARED = 2;
+ public final static byte TX_COMMITED = 3;
+ public final static byte TX_ROLLEDBACK = 4;
+
+ private static boolean trace = log.isTraceEnabled();
+
+ // Attributes ----------------------------------------------------
+
+ private byte state = TX_OPEN;
+
+ // Map<Integer(sessionID) - SessionTxState> maintained on the client side
+ private Map sessionStatesMap;
+
+ // Read from on the server side
+ private List sessionStatesList;
+
+ private boolean clientSide;
+
+ private boolean hasPersistentAcks;
+
+ private boolean failedOver;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClientTransaction()
+ {
+ clientSide = true;
+ }
+
+ // Public --------------------------------------------------------
+
+ public byte getState()
+ {
+ return state;
+ }
+
+ public void addMessage(int sessionId, JBossMessage msg)
+ {
+ if (!clientSide)
+ {
+ throw new IllegalStateException("Cannot call this method on the server side");
+ }
+ SessionTxState sessionTxState = getSessionTxState(sessionId);
+
+ sessionTxState.addMessage(msg);
+ }
+
+ public void addAck(int sessionId, DeliveryInfo info)
+ {
+ if (!clientSide)
+ {
+ throw new IllegalStateException("Cannot call this method on the server side");
+ }
+ SessionTxState sessionTxState = getSessionTxState(sessionId);
+
+ sessionTxState.addAck(info);
+
+ if (info.getMessageProxy().getMessage().isReliable())
+ {
+ hasPersistentAcks = true;
+ }
+ }
+
+ public boolean hasPersistentAcks()
+ {
+ return hasPersistentAcks;
+ }
+
+ public boolean isFailedOver()
+ {
+ return failedOver;
+ }
+
+ public void clearMessages()
+ {
+ if (!clientSide)
+ {
+ throw new IllegalStateException("Cannot call this method on the server side");
+ }
+
+ if (sessionStatesMap != null)
+ {
+ // This can be null if the tx was recreated on the client side due to recovery
+
+ for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
+ {
+ SessionTxState sessionTxState = (SessionTxState)i.next();
+ sessionTxState.clearMessages();
+ }
+ }
+ }
+
+ public void setState(byte state)
+ {
+ if (!clientSide)
+ {
+ throw new IllegalStateException("Cannot call this method on the server side");
+ }
+ this.state = state;
+ }
+
+ public List getSessionStates()
+ {
+ if (sessionStatesList != null)
+ {
+ return sessionStatesList;
+ }
+ else
+ {
+ return sessionStatesMap == null ?
+ Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+ }
+ }
+
+ /*
+ * Substitute newSessionID for oldSessionID
+ */
+ public void handleFailover(int newServerID, int oldSessionID, int newSessionID)
+ {
+ if (!clientSide)
+ {
+ throw new IllegalStateException("Cannot call this method on the server side");
+ }
+
+ // Note we have to do this in one go since there may be overlap between old and new session
+ // IDs and we don't want to overwrite keys in the map.
+
+ Map tmpMap = null;
+
+ if (sessionStatesMap != null)
+ {
+ for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
+ {
+
+ SessionTxState state = (SessionTxState)i.next();
+ state.handleFailover(newServerID, oldSessionID, newSessionID);
+
+ if (tmpMap == null)
+ {
+ tmpMap = new LinkedHashMap();
+ }
+ tmpMap.put(new Integer(newSessionID), state);
+ }
+ }
+
+ if (tmpMap != null)
+ {
+ // swap
+ sessionStatesMap = tmpMap;
+ }
+
+ failedOver = true;
+ }
+
+ /**
+ * May return an empty list, but never null.
+ */
+ public List getDeliveriesForSession(int sessionID)
+ {
+ if (!clientSide)
+ {
+ throw new IllegalStateException("Cannot call this method on the server side");
+ }
+
+ if (sessionStatesMap == null)
+ {
+ return Collections.EMPTY_LIST;
+ }
+ else
+ {
+ SessionTxState state = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
+
+ if (state != null)
+ {
+ return state.getAcks();
+ }
+ else
+ {
+ return Collections.EMPTY_LIST;
+ }
+ }
+ }
+
+ // Streamable implementation ---------------------------------
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeByte(state);
+
+ if (sessionStatesMap == null)
+ {
+ out.writeInt(0);
+ }
+ else
+ {
+ out.writeInt(sessionStatesMap.size());
+
+ Iterator iter = sessionStatesMap.values().iterator();
+
+ while (iter.hasNext())
+ {
+ SessionTxState state = (SessionTxState)iter.next();
+
+ out.writeInt(state.getSessionId());
+
+ List msgs = state.getMsgs();
+
+ out.writeInt(msgs.size());
+
+ Iterator iter2 = msgs.iterator();
+
+ while (iter2.hasNext())
+ {
+ JBossMessage m = (JBossMessage)iter2.next();
+
+ out.writeByte(m.getType());
+
+ m.write(out);
+ }
+
+ List acks = state.getAcks();
+
+ out.writeInt(acks.size());
+
+ iter2 = acks.iterator();
+
+ while (iter2.hasNext())
+ {
+ DeliveryInfo ack = (DeliveryInfo)iter2.next();
+
+ //We only need the delivery id written
+ out.writeLong(ack.getMessageProxy().getDeliveryId());
+ }
+ }
+ }
+ }
+
+
+ public void read(DataInputStream in) throws Exception
+ {
+ clientSide = false;
+
+ state = in.readByte();
+
+ int numSessions = in.readInt();
+
+ //Read in as a list since we don't want the extra overhead of putting into a map
+ //which won't be used on the server side
+ sessionStatesList = new ArrayList(numSessions);
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ int sessionId = in.readInt();
+
+ SessionTxState sessionState = new SessionTxState(sessionId);
+
+ sessionStatesList.add(sessionState);
+
+ int numMsgs = in.readInt();
+
+ for (int j = 0; j < numMsgs; j++)
+ {
+ byte type = in.readByte();
+
+ JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
+
+ msg.read(in);
+
+ sessionState.addMessage(msg);
+ }
+
+ int numAcks = in.readInt();
+
+ for (int j = 0; j < numAcks; j++)
+ {
+ long ack = in.readLong();
+
+ sessionState.addAck(new DefaultAck(ack));
+ }
+ }
+ }
+
+ // Protected -----------------------------------------------------
+
+ // Package Private -----------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private SessionTxState getSessionTxState(int sessionID)
+ {
+ if (sessionStatesMap == null)
+ {
+ sessionStatesMap = new LinkedHashMap();
+ }
+
+ SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
+
+ if (sessionTxState == null)
+ {
+ sessionTxState = new SessionTxState(sessionID);
+ sessionStatesMap.put(new Integer(sessionID), sessionTxState);
+ }
+
+ return sessionTxState;
+ }
+
+ // Inner Classes -------------------------------------------------
+
+ public class SessionTxState
+ {
+ private int sessionID;
+
+ // We record the server id when doing failover to avoid overwriting the sesion ID again if
+ // multiple connections fail on the same resource mamanger but fail onto old values of the
+ // session ID. This prevents the ID being failed over more than once for the same server.
+ private int serverID = -1;
+
+ private List msgs = new ArrayList();
+ private List acks = new ArrayList();
+
+ SessionTxState(int sessionID)
+ {
+ this.sessionID = sessionID;
+ }
+
+ void addMessage(JBossMessage msg)
+ {
+ msgs.add(msg);
+ }
+
+ void addAck(Ack ack)
+ {
+ acks.add(ack);
+ }
+
+ public List getMsgs()
+ {
+ return msgs;
+ }
+
+ public List getAcks()
+ {
+ return acks;
+ }
+
+ public int getSessionId()
+ {
+ return sessionID;
+ }
+
+ void handleFailover(int newServerID, int oldSessionID, int newSessionID)
+ {
+ if (sessionID == oldSessionID && serverID != newServerID)
+ {
+ sessionID = newSessionID;
+ serverID = newServerID;
+
+ // Remove any non persistent acks
+ for(Iterator i = acks.iterator(); i.hasNext(); )
+ {
+ DeliveryInfo di = (DeliveryInfo)i.next();
+
+ if (!di.getMessageProxy().getMessage().isReliable())
+ {
+ if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
+ i.remove();
+ }
+ }
+ }
+ }
+
+ void clearMessages()
+ {
+ msgs.clear();
+ }
+
+ }
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-05-14 22:55:42 UTC (rev 2676)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-05-14 23:05:48 UTC (rev 2677)
@@ -33,7 +33,6 @@
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.client.tx.ClientTransaction;
import org.jboss.jms.client.tx.TransactionRequest;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTemporaryQueue;
@@ -46,6 +45,7 @@
import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DeliveryRecovery;
import org.jboss.jms.server.remoting.JMSWireFormat;
+import org.jboss.jms.shared.tx.ClientTransaction;
import org.jboss.jms.wireformat.BrowserHasNextMessageRequest;
import org.jboss.jms.wireformat.BrowserHasNextMessageResponse;
import org.jboss.jms.wireformat.BrowserNextMessageBlockRequest;
More information about the jboss-cvs-commits
mailing list