[jboss-cvs] JBoss Messaging SVN: r2196 - in trunk: src/main/org/jboss/jms/client/container and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Feb 6 14:02:08 EST 2007
Author: clebert.suconic at jboss.com
Date: 2007-02-06 14:02:08 -0500 (Tue, 06 Feb 2007)
New Revision: 2196
Modified:
trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java
trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-809 - fix & testcases
Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2007-02-06 19:02:08 UTC (rev 2196)
@@ -40,6 +40,7 @@
UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?
+SELECT_EXISTS_REF_MESSAGEID=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID = ?
UPDATE_DELIVERYCOUNT=UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?
LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -14,6 +14,8 @@
import org.jboss.jms.client.FailureDetector;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.HierarchicalState;
@@ -116,33 +118,36 @@
remotingConnection = fcc.getRemotingConnection();
return invocation.invokeNext();
}
-// catch (CannotConnectException e)
-// {
-// log.debug(this + " putting " + methodName + "() on hold until failover completes");
-//
-// fcc.failureDetected(e, this, remotingConnection);
-//
-// log.debug(this + " resuming " + methodName + "()");
-// return invocation.invokeNext();
-// }
-// catch (IOException e)
-// {
-// log.debug(this + " putting " + methodName + "() on hold until failover completes");
-//
-// fcc.failureDetected(e, this, remotingConnection);
-//
-// log.debug(this + " resuming " + methodName + "()");
-// return invocation.invokeNext();
-// }
catch (MessagingNetworkFailureException e)
{
log.debug(this + " putting " + methodName + "() on hold until failover completes");
- log.info("********** CAUGHT NETWOEK FAILURE");
+ log.info("********** CAUGHT NETWORK FAILURE");
fcc.failureDetected(e, this, remotingConnection);
log.debug(this + " resuming " + methodName + "()");
+
+ Object target = invocation.getTargetObject();
+
+ if (methodName.equals("send") &&
+ target instanceof ClientSessionDelegate)
+ {
+ log.debug("#### Capturing send invocation.. setting retry to true");
+ Object[] arguments = ((MethodInvocation)invocation).getArguments();
+ arguments[1] = Boolean.TRUE;
+ ((MethodInvocation)invocation).setArguments(arguments);
+ }
+ else
+ if (methodName.equals("sendTransaction") &&
+ target instanceof ClientConnectionDelegate)
+ {
+ log.debug("#### Capturing sendTransaction invocation.. setting retry to true");
+ Object[] arguments = ((MethodInvocation)invocation).getArguments();
+ arguments[1] = Boolean.TRUE;
+ ((MethodInvocation)invocation).setArguments(arguments);
+ }
+
return invocation.invokeNext();
}
catch (Throwable e)
Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -250,7 +250,7 @@
// we now invoke the send(Message) method on the session, which will eventually be fielded
// by connection endpoint
- ((SessionDelegate)sessionState.getDelegate()).send(messageToSend);
+ ((SessionDelegate)sessionState.getDelegate()).send(messageToSend, false);
return null;
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -206,9 +206,9 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- public void sendTransaction(TransactionRequest request) throws JMSException
+ public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
{
- RequestSupport req = new ConnectionSendTransactionRequest(id, version, request);
+ RequestSupport req = new ConnectionSendTransactionRequest(id, version, request, retry);
doInvoke(client, req);
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -430,9 +430,9 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- public void send(JBossMessage m) throws JMSException
+ public void send(JBossMessage m, boolean retry) throws JMSException
{
- RequestSupport req = new SessionSendRequest(id, version, m);
+ RequestSupport req = new SessionSendRequest(id, version, m, retry);
doInvoke(client, req);
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -53,7 +53,7 @@
void stop() throws JMSException;
- void sendTransaction(TransactionRequest request) throws JMSException;
+ void sendTransaction(TransactionRequest request, boolean retry) throws JMSException;
MessagingXid[] getPreparedTransactions() throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -418,7 +418,7 @@
log.trace(this + " closing (noop)");
}
- public void sendTransaction(TransactionRequest request) throws JMSException
+ public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
{
try
{
@@ -432,7 +432,7 @@
if (trace) { log.trace(this + " received ONE_PHASE_COMMIT request"); }
Transaction tx = tr.createTransaction();
- processTransaction(request.getState(), tx);
+ processTransaction(request.getState(), tx, retry);
tx.commit();
}
else if (request.getRequestType() == TransactionRequest.TWO_PHASE_PREPARE_REQUEST)
@@ -440,7 +440,7 @@
if (trace) { log.trace(this + " received TWO_PHASE_COMMIT prepare request"); }
Transaction tx = tr.createTransaction(request.getXid());
- processTransaction(request.getState(), tx);
+ processTransaction(request.getState(), tx, retry);
tx.prepare();
}
else if (request.getRequestType() == TransactionRequest.TWO_PHASE_COMMIT_REQUEST)
@@ -607,7 +607,7 @@
return remotingClientSessionID;
}
- void sendMessage(JBossMessage msg, Transaction tx) throws Exception
+ void sendMessage(JBossMessage msg, Transaction tx, boolean retry) throws Exception
{
JBossDestination dest = (JBossDestination)msg.getJMSDestination();
@@ -617,6 +617,15 @@
// TODO Do we want to set this for ALL messages. Optimisation is possible here.
msg.setConnectionID(id);
+ if (retry)
+ {
+ // Message is already stored... so just ignoring the call
+ if (serverPeer.getPersistenceManagerInstance().referenceExists(msg.getMessageID()))
+ {
+ return;
+ }
+ }
+
// messages arriving over a failed-over connections will be give preferential treatment by
// routers, which will send them directly to their corresponding failover queues, not to
// the "local" queues, to reduce clutter and unnecessary "pull policy" revving.
@@ -695,7 +704,8 @@
}
}
- private void processTransaction(ClientTransaction txState, Transaction tx) throws Throwable
+ private void processTransaction(ClientTransaction txState,
+ Transaction tx, boolean retry) throws Throwable
{
if (trace) { log.trace(this + " processing transaction " + tx); }
@@ -709,7 +719,7 @@
for (Iterator j = sessionState.getMsgs().iterator(); j.hasNext(); )
{
- sendMessage((JBossMessage)j.next(), tx);
+ sendMessage((JBossMessage)j.next(), tx, retry);
}
// send the acks
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -310,11 +310,11 @@
if (trace) log.trace(this + " closing (noop)");
}
- public void send(JBossMessage message) throws JMSException
+ public void send(JBossMessage message, boolean retry) throws JMSException
{
try
{
- connectionEndpoint.sendMessage(message, null);
+ connectionEndpoint.sendMessage(message, null, retry);
}
catch (Throwable t)
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -123,7 +123,7 @@
* @param message The message to send
* @throws JMSException
*/
- void send(JBossMessage message) throws JMSException;
+ void send(JBossMessage message, boolean retry) throws JMSException;
/**
* Send delivery info to the server so the delivery lists can be repopulated. Used only in
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -94,9 +94,9 @@
endpoint.stop();
}
- public void sendTransaction(TransactionRequest request) throws JMSException
+ public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
{
- endpoint.sendTransaction(request);
+ endpoint.sendTransaction(request, retry);
}
public MessagingXid[] getPreparedTransactions() throws JMSException
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -75,9 +75,9 @@
endpoint.closing();
}
- public void send(JBossMessage msg) throws JMSException
+ public void send(JBossMessage msg, boolean retry) throws JMSException
{
- endpoint.send(msg);
+ endpoint.send(msg, retry);
}
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -195,7 +195,7 @@
try
{
- connection.sendTransaction(request);
+ connection.sendTransaction(request, false);
// If we get this far we can remove the transaction
@@ -616,7 +616,7 @@
{
try
{
- connection.sendTransaction(request);
+ connection.sendTransaction(request, false);
}
catch (Throwable t)
{
Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -40,6 +40,7 @@
public class ConnectionSendTransactionRequest extends RequestSupport
{
private TransactionRequest req;
+ private boolean retry;
public ConnectionSendTransactionRequest()
{
@@ -47,11 +48,13 @@
public ConnectionSendTransactionRequest(int objectId,
byte version,
- TransactionRequest req)
+ TransactionRequest req,
+ boolean retry)
{
super(objectId, PacketSupport.REQ_CONNECTION_SENDTRANSACTION, version);
this.req = req;
+ this.retry = retry;
}
public void read(DataInputStream is) throws Exception
@@ -61,6 +64,8 @@
req = new TransactionRequest();
req.read(is);
+
+ retry = is.readBoolean();
}
public ResponseSupport serverInvoke() throws Exception
@@ -73,7 +78,7 @@
throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
}
- endpoint.sendTransaction(req);
+ endpoint.sendTransaction(req, retry);
return null;
}
@@ -83,6 +88,8 @@
super.write(os);
req.write(os);
+
+ os.writeBoolean(retry);
os.flush();
}
Modified: trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -42,6 +42,7 @@
public class SessionSendRequest extends RequestSupport
{
private JBossMessage msg;
+ private boolean retry;
public SessionSendRequest()
{
@@ -49,11 +50,12 @@
public SessionSendRequest(int objectId,
byte version,
- JBossMessage msg)
+ JBossMessage msg,
+ boolean retry)
{
super(objectId, PacketSupport.REQ_SESSION_SEND, version);
-
this.msg = msg;
+ this.retry = retry;
}
public void read(DataInputStream is) throws Exception
@@ -64,7 +66,9 @@
msg = (JBossMessage)MessageFactory.createMessage(messageType);
- msg.read(is);
+ msg.read(is);
+
+ retry = is.readBoolean();
}
public ResponseSupport serverInvoke() throws Exception
@@ -76,9 +80,9 @@
{
throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
}
+
+ endpoint.send(msg, retry);
- endpoint.send(msg);
-
return null;
}
@@ -89,6 +93,8 @@
os.writeByte(msg.getType());
msg.write(os);
+
+ os.writeBoolean(retry);
os.flush();
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -1715,17 +1715,17 @@
PreparedStatement st = null;
ResultSet rs = null;
TransactionWrapper wrap = new TransactionWrapper();
-
+
try
{
conn = ds.getConnection();
-
+
st = conn.prepareStatement(getSQLStatement("SELECT_EXISTS_REF"));
st.setLong(1, channelID);
st.setLong(2, messageID);
-
+
rs = st.executeQuery();
-
+
if (rs.next())
{
return true;
@@ -1775,7 +1775,73 @@
wrap.end();
}
}
-
+
+ public boolean referenceExists(long messageID) throws Exception
+ {
+ Connection conn = null;
+ PreparedStatement st = null;
+ ResultSet rs = null;
+ TransactionWrapper wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ st = conn.prepareStatement(getSQLStatement("SELECT_EXISTS_REF_MESSAGEID"));
+ st.setLong(1, messageID);
+
+ rs = st.executeQuery();
+
+ if (rs.next())
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ if (rs != null)
+ {
+ try
+ {
+ rs.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ if (st != null)
+ {
+ try
+ {
+ st.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ wrap.end();
+ }
+ }
+
// Public --------------------------------------------------------
public String toString()
@@ -3330,6 +3396,7 @@
map.put("UPDATE_RELIABLE_REFS_NOT_PAGED", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?");
map.put("SELECT_MIN_MAX_PAGE_ORD", "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?");
map.put("SELECT_EXISTS_REF", "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
+ map.put("SELECT_EXISTS_REF_MESSAGEID", "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID = ?");
map.put("UPDATE_DELIVERYCOUNT", "UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?");
//Message
map.put("LOAD_MESSAGES",
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -80,7 +80,11 @@
// Clustering recovery related functionality
boolean referenceExists(long channelID, long messageID) throws Exception;
-
+
+ // Failover elated functionality (retry on send)
+
+ boolean referenceExists(long messageID) throws Exception;
+
// Interface value classes
//---------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -516,7 +516,7 @@
TransactionRequest tr = new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
RequestSupport req =
- new ConnectionSendTransactionRequest(23, (byte)77, tr);
+ new ConnectionSendTransactionRequest(23, (byte)77, tr, false);
testPacket(req, PacketSupport.REQ_CONNECTION_SENDTRANSACTION);
}
@@ -636,7 +636,7 @@
JBossMessage msg = new JBossMessage(123);
RequestSupport req =
- new SessionSendRequest(23, (byte)77, msg);
+ new SessionSendRequest(23, (byte)77, msg, false);
testPacket(req, PacketSupport.REQ_SESSION_SEND);
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -1687,6 +1687,79 @@
failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
}
+ public void testFailureRightAFterSendTransaction() throws Exception
+ {
+ Connection conn = null;
+ Connection conn0 = null;
+
+ try
+ {
+ conn0 = cf.createConnection();
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+ conn0.start();
+
+ conn = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+ // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+ JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+ getDelegate()).getRemotingConnection();
+ rc.removeConnectionListener();
+
+ // poison the server
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
+
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn.start();
+
+ MessageProducer producer = session.createProducer(queue[0]);
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ MessageConsumer consumer = session.createConsumer(queue[0]);
+
+ producer.send(session.createTextMessage("before-poison1"));
+ producer.send(session.createTextMessage("before-poison2"));
+ producer.send(session.createTextMessage("before-poison3"));
+ session.commit();
+
+ Thread.sleep(2000);
+
+ for (int i = 1; i <= 3; i++)
+ {
+ TextMessage tm = (TextMessage) consumer.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("before-poison" + i, tm.getText());
+ }
+
+ assertNull(consumer.receive(1000));
+ assertNull(consumer0.receive(5000));
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ }
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -295,7 +295,7 @@
return null;
}
- public void sendTransaction(TransactionRequest request) throws JMSException
+ public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
{
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java 2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java 2007-02-06 19:02:08 UTC (rev 2196)
@@ -48,6 +48,8 @@
public static final int FAIL_SYNCHRONIZED_SEND_RECEIVE = 6;
+ public static final int FAIL_AFTER_SENDTRANSACTION = 7;
+
// Static ---------------------------------------------------------------------------------------
private static int type;
@@ -98,6 +100,14 @@
crash(target);
}
+ else
+ if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST &&
+ type == FAIL_AFTER_SENDTRANSACTION)
+ {
+ invocation.invokeNext();
+ log.info("#### Crash after sendTransaction");
+ crash(target);
+ }
}
else if (target instanceof SessionAdvised && "acknowledgeDelivery".equals(methodName)
&& type == FAIL_AFTER_ACKNOWLEDGE_DELIVERY)
More information about the jboss-cvs-commits
mailing list