[jboss-cvs] JBoss Messaging SVN: r1913 - in trunk: . src/etc/server/default/deploy src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jan 6 04:18:33 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-06 04:18:24 -0500 (Sat, 06 Jan 2007)
New Revision: 1913
Modified:
trunk/messaging.ipr
trunk/src/etc/server/default/deploy/multiplexer-service.xml
trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
Added extra session failover tests (http://jira.jboss.org/jira/browse/JBMESSAGING-707)
and also fixed a few bugs in the process.
Modified: trunk/messaging.ipr
===================================================================
--- trunk/messaging.ipr 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/messaging.ipr 2007-01-06 09:18:24 UTC (rev 1913)
@@ -259,7 +259,9 @@
<root url="jar://$PROJECT_DIR$/thirdparty/jgroups/lib/jgroups.jar!/" />
</CLASSES>
<JAVADOC />
- <SOURCES />
+ <SOURCES>
+ <root url="file://C:/work/src/JGroups-2.4.1.src/src" />
+ </SOURCES>
</library>
<library name="junit">
<CLASSES>
Modified: trunk/src/etc/server/default/deploy/multiplexer-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/multiplexer-service.xml 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/etc/server/default/deploy/multiplexer-service.xml 2007-01-06 09:18:24 UTC (rev 1913)
@@ -1,9 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE server
- PUBLIC "-//JBoss//DTD MBean Service 3.2//EN"
- "http://www.jboss.org/j2ee/dtd/jboss-service_3_2.dtd">
-
<server>
Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -164,22 +164,29 @@
// attempt to grab the reader's lock and go forward
+ boolean exempt = false;
+
try
{
- boolean acquired = false;
+ exempt = isInvocationExempt(methodName);
- while(!acquired)
+ if (!exempt)
{
- try
+ boolean acquired = false;
+
+ while(!acquired)
{
- acquired = readLock.attempt(500);
+ try
+ {
+ acquired = readLock.attempt(500);
+ }
+ catch(InterruptedException e)
+ {
+ // OK
+ }
+
+ if (trace && !acquired ) { log.trace(methodName + "() trying to pass through " + this); }
}
- catch(InterruptedException e)
- {
- // OK
- }
-
- if (trace && !acquired ) { log.trace(methodName + "() trying to pass through " + this); }
}
synchronized(this)
@@ -191,13 +198,17 @@
}
}
- if(trace) { log.trace(this + " has let " + methodName + "() pass through"); }
+ if (trace) { log.trace(this + " allowed " + (exempt ? "exempt" : "") + " method " + methodName + "() to pass through"); }
return invocation.invokeNext();
}
finally
{
- readLock.release();
+ if (!exempt)
+ {
+ readLock.release();
+ }
+
synchronized(this)
{
activeThreadsCount--;
@@ -206,6 +217,7 @@
activeMethods.remove(methodName);
}
}
+
}
}
@@ -225,5 +237,10 @@
// Private --------------------------------------------------------------------------------------
+ private boolean isInvocationExempt(String methodName)
+ {
+ return "recoverDeliveries".equals(methodName);
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -238,9 +238,9 @@
if (!isTransacted() || (isXA() && getCurrentTxId() == null))
{
// Non transacted session or an XA session with no transaction set (it falls back
- // to auto_ack)
+ // to AUTO_ACKNOWLEDGE)
- log.debug(this + " is not transacted (or XA with no tx set), " +
+ log.debug(this + " is not transacted (or XA with no transaction set), " +
"retrieving deliveries from session state");
// We remove any unacked non-persistent messages - this is because we don't want to ack
@@ -284,31 +284,27 @@
else
{
// Transacted session - we need to get the acks from the resource manager. BTW we have
- // kept the old resource manager
+ // kept the old resource manager.
ackInfos = rm.getDeliveriesForSession(getSessionID());
}
if (!ackInfos.isEmpty())
{
- SessionDelegate nd = (SessionDelegate)getDelegate();
-
List recoveryInfos = new ArrayList();
-
for (Iterator i = ackInfos.iterator(); i.hasNext(); )
{
- DeliveryInfo info = (DeliveryInfo)i.next();
-
+ DeliveryInfo del = (DeliveryInfo)i.next();
DeliveryRecovery recInfo =
- new DeliveryRecovery(info.getMessageProxy().getDeliveryId(),
- info.getMessageProxy().getMessage().getMessageID(),
- info.getChannelId());
+ new DeliveryRecovery(del.getMessageProxy().getDeliveryId(),
+ del.getMessageProxy().getMessage().getMessageID(),
+ del.getChannelId());
recoveryInfos.add(recInfo);
}
log.debug(this + " sending delivery recovery " + recoveryInfos + " on failover");
- nd.recoverDeliveries(recoveryInfos);
+ newDelegate.recoverDeliveries(recoveryInfos);
}
else
{
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -835,7 +835,7 @@
// This node may be failing over for another node - in which case we must wait for that to be
// complete.
- log.info(this + " waiting for server-side failover for failed node " + failedNodeID + " to complete");
+ log.debug(this + " waiting for server-side failover for failed node " + failedNodeID + " to complete");
Replicator replicator = getReplicator();
@@ -863,12 +863,12 @@
if (status.isFailedOverForNode(failedNodeID))
{
- log.info(this + ": failover is complete on node " + nid);
+ log.debug(this + ": failover is complete on node " + nid);
return nid.intValue();
}
else if (status.isFailingOverForNode(failedNodeID))
{
- log.info(this + ": fail over is in progress on node " + nid);
+ log.debug(this + ": fail over is in progress on node " + nid);
// A server has started failing over for the failed node, but not completed.
// If it's not this node then we immediately return so the connection can be
@@ -883,7 +883,7 @@
if (completeToWait <= 0)
{
// Give up now
- log.info(this + " already waited long enough for failover to complete, giving up");
+ log.debug(this + " already waited long enough for failover to complete, giving up");
return -1;
}
@@ -919,7 +919,7 @@
if (startToWait <= 0)
{
// Don't want to wait again
- log.info(this + " already waited long enough for failover to start, giving up");
+ log.debug(this + " already waited long enough for failover to start, giving up");
return -1;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DeliveryRecovery.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -39,56 +39,74 @@
*
*/
public class DeliveryRecovery implements Streamable
-{
- private long deliveryId;
-
- private long messageId;
-
- private long channelId;
-
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private long deliveryID;
+ private long messageID;
+ private long channelID;
+
+ // Constructors ---------------------------------------------------------------------------------
+
public DeliveryRecovery()
- {
+ {
}
-
- public DeliveryRecovery(long deliveryId, long messageId, long channelId)
+
+ public DeliveryRecovery(long deliveryID, long messageID, long channelID)
{
- this.deliveryId = deliveryId;
-
- this.messageId = messageId;
-
- this.channelId = channelId;
+ this.deliveryID = deliveryID;
+ this.messageID = messageID;
+ this.channelID = channelID;
}
-
- public long getDeliveryId()
+
+ // Streamable implementation --------------------------------------------------------------------
+
+ public void read(DataInputStream in) throws Exception
{
- return deliveryId;
+ deliveryID = in.readLong();
+ messageID = in.readLong();
+ channelID = in.readLong();
}
-
- public long getMessageId()
+
+ public void write(DataOutputStream out) throws Exception
{
- return messageId;
+ out.writeLong(deliveryID);
+ out.writeLong(messageID);
+ out.writeLong(channelID);
}
-
- public long getChannelId()
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public long getDeliveryID()
{
- return channelId;
+ return deliveryID;
}
- public void read(DataInputStream in) throws Exception
+ public long getMessageID()
{
- deliveryId = in.readLong();
-
- messageId = in.readLong();
-
- channelId = in.readLong();
+ return messageID;
}
- public void write(DataOutputStream out) throws Exception
+ public long getChannelID()
{
- out.writeLong(deliveryId);
-
- out.writeLong(messageId);
-
- out.writeLong(channelId);
+ return channelID;
}
+
+ public String toString()
+ {
+ return "DeliveryRecovery[ID=" + deliveryID + ", MID=" + messageID + ", CID=" + channelID + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -411,7 +411,7 @@
{
DeliveryRecovery deliveryInfo = (DeliveryRecovery)iter.next();
- Long channelId = new Long(deliveryInfo.getChannelId());
+ Long channelId = new Long(deliveryInfo.getChannelID());
List acks = (List)ackMap.get(channelId);
@@ -449,7 +449,7 @@
{
DeliveryRecovery info = (DeliveryRecovery)iter2.next();
- ids.add(new Long(info.getMessageId()));
+ ids.add(new Long(info.getMessageID()));
}
Queue queue = binding.getQueue();
@@ -466,7 +466,7 @@
DeliveryRecovery info = (DeliveryRecovery)iter3.next();
- long deliveryId = info.getDeliveryId();
+ long deliveryId = info.getDeliveryID();
maxDeliveryId = Math.max(maxDeliveryId, deliveryId);
@@ -849,7 +849,7 @@
void acknowledgeTransactionally(List acks, Transaction tx) throws Throwable
{
- if (trace) { log.trace(this + " acknowledging transactionally " + acks.size() + " for tx: " + tx); }
+ if (trace) { log.trace(this + " acknowledging transactionally " + acks.size() + " messages for " + tx); }
DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getCallback(this);
@@ -859,14 +859,10 @@
tx.addCallback(deliveryCallback, this);
}
- Iterator iter = acks.iterator();
-
- while (iter.hasNext())
+ for(Iterator i = acks.iterator(); i.hasNext(); )
{
- Ack ack = (Ack)iter.next();
-
+ Ack ack = (Ack)i.next();
Long id = new Long(ack.getDeliveryID());
-
DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
if (rec == null)
@@ -875,7 +871,6 @@
}
deliveryCallback.addDeliveryId(id);
-
rec.del.acknowledge(tx);
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -126,7 +126,8 @@
void send(JBossMessage message) throws JMSException;
/**
- * Send delivery info to the server so the delivery lists can be repopulated used at failover
+ * Send delivery info to the server so the delivery lists can be repopulated. Used only in
+ * failover.
*/
void recoverDeliveries(List createInfos) throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -35,6 +35,7 @@
import org.jboss.jms.server.endpoint.DefaultAck;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.messaging.core.message.MessageFactory;
+import org.jboss.logging.Logger;
/**
* Holds the state of a transaction on the client side
@@ -42,47 +43,47 @@
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
*/
public class ClientTransaction
-{
+{
// Constants -----------------------------------------------------
-
+
+ private static final Logger log = Logger.getLogger(ClientTransaction.class);
+
public final static byte TX_OPEN = 0;
-
public final static byte TX_ENDED = 1;
-
public final static byte TX_PREPARED = 2;
-
public final static byte TX_COMMITED = 3;
-
public final static byte TX_ROLLEDBACK = 4;
-
+
+ private static boolean trace = log.isTraceEnabled();
+
// Attributes ----------------------------------------------------
-
+
private byte state = TX_OPEN;
-
- //Maintained on the client side
+
+ // Map<Integer(sessionID) - SessionTxState> maintained on the client side
private Map sessionStatesMap;
-
- //Read from on the server side
+
+ // Read from on the server side
private List sessionStatesList;
-
+
private boolean clientSide;
-
+
// Static --------------------------------------------------------
-
+
// Constructors --------------------------------------------------
-
+
public ClientTransaction()
- {
+ {
clientSide = true;
}
-
+
// Public --------------------------------------------------------
-
+
public byte getState()
{
return state;
}
-
+
public void addMessage(int sessionId, JBossMessage msg)
{
if (!clientSide)
@@ -90,10 +91,10 @@
throw new IllegalStateException("Cannot call this method on the server side");
}
SessionTxState sessionTxState = getSessionTxState(sessionId);
-
+
sessionTxState.addMessage(msg);
}
-
+
public void addAck(int sessionId, DeliveryInfo info)
{
if (!clientSide)
@@ -101,32 +102,29 @@
throw new IllegalStateException("Cannot call this method on the server side");
}
SessionTxState sessionTxState = getSessionTxState(sessionId);
-
+
sessionTxState.addAck(info);
- }
-
+ }
+
public void clearMessages()
{
if (!clientSide)
{
throw new IllegalStateException("Cannot call this method on the server side");
}
-
+
if (sessionStatesMap != null)
{
- //This can be null if the tx was recreated on the client side due to recovery
-
- Iterator iter = sessionStatesMap.values().iterator();
-
- while (iter.hasNext())
+ // This can be null if the tx was recreated on the client side due to recovery
+
+ for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
{
- SessionTxState sessionTxState = (SessionTxState)iter.next();
-
+ SessionTxState sessionTxState = (SessionTxState)i.next();
sessionTxState.clearMessages();
}
}
}
-
+
public void setState(byte state)
{
if (!clientSide)
@@ -135,7 +133,7 @@
}
this.state = state;
}
-
+
public List getSessionStates()
{
if (sessionStatesList != null)
@@ -144,15 +142,16 @@
}
else
{
- return sessionStatesMap == null ? Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+ return sessionStatesMap == null ?
+ Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
}
- }
-
+ }
+
/*
- * Substitute newSessionID for oldSessionID
- */
+ * Substitute newSessionID for oldSessionID
+ */
public void handleFailover(int newServerID, int oldSessionID, int newSessionID)
- {
+ {
if (!clientSide)
{
throw new IllegalStateException("Cannot call this method on the server side");
@@ -160,15 +159,30 @@
// Note we have to do this in one go since there may be overlap between old and new session
// IDs and we don't want to overwrite keys in the map.
-
+
+ Map tmpMap = null;
+
if (sessionStatesMap != null)
- {
+ {
for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
{
+
SessionTxState state = (SessionTxState)i.next();
state.handleFailover(newServerID, oldSessionID, newSessionID);
+
+ if (tmpMap == null)
+ {
+ tmpMap = new LinkedHashMap();
+ }
+ tmpMap.put(new Integer(newSessionID), state);
}
}
+
+ if (tmpMap != null)
+ {
+ // swap
+ sessionStatesMap = tmpMap;
+ }
}
/**
@@ -182,7 +196,7 @@
}
SessionTxState state = getSessionTxState(sessionID);
-
+
if (state != null)
{
return state.getAcks();
@@ -192,9 +206,9 @@
return Collections.EMPTY_LIST;
}
}
-
+
// Streamable implementation ---------------------------------
-
+
public void write(DataOutputStream out) throws Exception
{
out.writeByte(state);
@@ -246,86 +260,84 @@
}
}
}
-
-
+
+
public void read(DataInputStream in) throws Exception
{
clientSide = false;
-
+
state = in.readByte();
-
+
int numSessions = in.readInt();
-
+
//Read in as a list since we don't want the extra overhead of putting into a map
//which won't be used on the server side
sessionStatesList = new ArrayList(numSessions);
-
+
for (int i = 0; i < numSessions; i++)
{
int sessionId = in.readInt();
-
+
SessionTxState sessionState = new SessionTxState(sessionId);
-
+
sessionStatesList.add(sessionState);
-
+
int numMsgs = in.readInt();
-
- for (int j = 0; j < numMsgs; j++)
+
+ for (int j = 0; j < numMsgs; j++)
{
byte type = in.readByte();
-
+
JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
-
+
msg.read(in);
-
+
sessionState.addMessage(msg);
}
-
+
int numAcks = in.readInt();
-
- for (int j = 0; j < numAcks; j++)
+
+ for (int j = 0; j < numAcks; j++)
{
long ack = in.readLong();
-
+
sessionState.addAck(new DefaultAck(ack));
}
- }
+ }
}
-
+
// Protected -----------------------------------------------------
-
+
// Package Private -----------------------------------------------
-
+
// Private -------------------------------------------------------
-
- private SessionTxState getSessionTxState(int sessionId)
- {
+
+ private SessionTxState getSessionTxState(int sessionID)
+ {
if (sessionStatesMap == null)
{
sessionStatesMap = new LinkedHashMap();
}
-
- SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(new Integer(sessionId));
-
+
+ SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(new Integer(sessionID));
+
if (sessionTxState == null)
{
- sessionTxState = new SessionTxState(sessionId);
-
- sessionStatesMap.put(new Integer(sessionId), sessionTxState);
+ sessionTxState = new SessionTxState(sessionID);
+ sessionStatesMap.put(new Integer(sessionID), sessionTxState);
}
-
+
return sessionTxState;
}
-
-
+
// Inner Classes -------------------------------------------------
-
+
public class SessionTxState
{
private int sessionID;
- // We record the server id when doing failover to avoid overwriting the sesion ID again
- // if multiple connections fail on the same resource mamanger but fail onto old values of the
+ // We record the server id when doing failover to avoid overwriting the sesion ID again if
+ // multiple connections fail on the same resource mamanger but fail onto old values of the
// session ID. This prevents the ID being failed over more than once for the same server.
private int serverID = -1;
@@ -336,58 +348,58 @@
{
this.sessionID = sessionID;
}
-
+
void addMessage(JBossMessage msg)
{
msgs.add(msg);
}
-
+
void addAck(Ack ack)
{
acks.add(ack);
}
-
+
public List getMsgs()
{
return msgs;
}
-
+
public List getAcks()
{
return acks;
}
-
+
public int getSessionId()
{
return sessionID;
}
-
+
void handleFailover(int newServerID, int oldSessionID, int newSessionID)
{
if (sessionID == oldSessionID && serverID != newServerID)
- {
+ {
sessionID = newSessionID;
serverID = newServerID;
-
+
// Remove any non persistent acks
for(Iterator i = acks.iterator(); i.hasNext(); )
{
DeliveryInfo di = (DeliveryInfo)i.next();
-
+
if (!di.getMessageProxy().getMessage().isReliable())
{
+ if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
i.remove();
}
}
}
+ }
- }
-
void clearMessages()
{
msgs.clear();
}
-
+
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -119,7 +119,7 @@
public void acknowledge(Transaction tx) throws Throwable
{
- if (trace) { log.trace(this + " acknowledging delivery in tx:" + tx); }
+ if (trace) { log.trace(this + " acknowledging delivery in " + tx); }
observer.acknowledge(this, tx);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -479,7 +479,6 @@
{
binding = super.getBindingforChannelId(channelId);
}
- log.info("Returned " + binding);
return binding;
}
finally
@@ -2144,7 +2143,7 @@
}
else
{
- log.info(this + " has already a " + queueName + " queue so adding to failed map");
+ log.debug(this + " has already a " + queueName + " queue so adding to failed map");
}
// Create a new binding
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/jchannelfactory/MultiplexorJChannelFactory.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -31,8 +31,8 @@
*
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
* @version <tt>$Revision$</tt>
- * <p/>
- * $Id$
+ *
+ * $Id$
*/
public class MultiplexorJChannelFactory implements JChannelFactory
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -1047,11 +1047,275 @@
}
}
- // TODO add a test whose session has NON-TRANSACTED, PERSISTENT and NON-PERSISTENT ACKS.
+ public void testSessionWithAcknowledgmentsFailover() throws Exception
+ {
+ Connection conn = null;
- // TODO add a test whose session has TRANSACTED, PERSISTENT and NON-PERSISTENT ACKS.
+ try
+ {
+ // skip connection to node 0
+ conn = cf.createConnection();
+ conn.close();
+ // create a connection to node 1
+ conn = cf.createConnection();
+ conn.start();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // send 2 messages (one persistent and one non-persistent)
+
+ MessageProducer prod = session.createProducer(queue[1]);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.send(session.createTextMessage("clik-persistent"));
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.send(session.createTextMessage("clak-non-persistent"));
+
+ // close the producer
+ prod.close();
+
+ // create a consumer and receive messages, but don't acknowledge
+
+ MessageConsumer cons = session.createConsumer(queue[1]);
+ TextMessage clik = (TextMessage)cons.receive(2000);
+ assertEquals("clik-persistent", clik.getText());
+ TextMessage clak = (TextMessage)cons.receive(2000);
+ assertEquals("clak-non-persistent", clak.getText());
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ assertEquals(0, ((JBossConnection)conn).getServerID());
+
+ // acknowledge the messages
+ clik.acknowledge();
+ clak.acknowledge();
+
+ // make sure no messages are left in the queue
+ Message m = cons.receive(1000);
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testTransactedSessionWithAcknowledgmentsCommitOnFailover() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ // skip connection to node 0
+ conn = cf.createConnection();
+ conn.close();
+
+ // create a connection to node 1
+ conn = cf.createConnection();
+
+ conn.start();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ // send 2 messages (one persistent and one non-persistent)
+
+ MessageProducer prod = session.createProducer(queue[1]);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.send(session.createTextMessage("clik-persistent"));
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.send(session.createTextMessage("clak-non-persistent"));
+
+ session.commit();
+
+ // close the producer
+ prod.close();
+
+ // create a consumer and receive messages, but don't acknowledge
+
+ MessageConsumer cons = session.createConsumer(queue[1]);
+ TextMessage clik = (TextMessage)cons.receive(2000);
+ assertEquals("clik-persistent", clik.getText());
+ TextMessage clak = (TextMessage)cons.receive(2000);
+ assertEquals("clak-non-persistent", clak.getText());
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ assertEquals(0, ((JBossConnection)conn).getServerID());
+
+ // acknowledge the messages
+ session.commit();
+
+ // make sure no messages are left in the queue
+ Message m = cons.receive(1000);
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testTransactedSessionWithAcknowledgmentsRollbackOnFailover() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ // skip connection to node 0
+ conn = cf.createConnection();
+ conn.close();
+
+ // create a connection to node 1
+ conn = cf.createConnection();
+
+ conn.start();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ // send 2 messages (one persistent and one non-persistent)
+
+ MessageProducer prod = session.createProducer(queue[1]);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.send(session.createTextMessage("clik-persistent"));
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.send(session.createTextMessage("clak-non-persistent"));
+
+ session.commit();
+
+ // close the producer
+ prod.close();
+
+ // create a consumer and receive messages, but don't acknowledge
+
+ MessageConsumer cons = session.createConsumer(queue[1]);
+ TextMessage clik = (TextMessage)cons.receive(2000);
+ assertEquals("clik-persistent", clik.getText());
+ TextMessage clak = (TextMessage)cons.receive(2000);
+ assertEquals("clak-non-persistent", clak.getText());
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ assertEquals(0, ((JBossConnection)conn).getServerID());
+
+ session.rollback();
+
+ TextMessage m = (TextMessage)cons.receive(2000);
+ assertNotNull(m);
+ assertEquals("clik-persistent", m.getText());
+
+ m = (TextMessage)cons.receive(2000);
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
public void testFailoverListener() throws Exception
{
Connection conn = null;
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-01-06 06:45:30 UTC (rev 1912)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2007-01-06 09:18:24 UTC (rev 1913)
@@ -317,7 +317,7 @@
log.info("Could find multiplexer config");
}
- MBeanConfigurationElement multiplexerConfig = (MBeanConfigurationElement) services.iterator().next();
+ MBeanConfigurationElement multiplexerConfig = (MBeanConfigurationElement)services.iterator().next();
ObjectName nameMultiplexer = sc.registerAndConfigureService(multiplexerConfig);
sc.invoke(nameMultiplexer,"create", new Object[0], new String[0]);
sc.invoke(nameMultiplexer,"start", new Object[0], new String[0]);
More information about the jboss-cvs-commits
mailing list