[jboss-cvs] JBoss Messaging SVN: r2196 - in trunk: src/main/org/jboss/jms/client/container and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Feb 6 14:02:08 EST 2007


Author: clebert.suconic at jboss.com
Date: 2007-02-06 14:02:08 -0500 (Tue, 06 Feb 2007)
New Revision: 2196

Modified:
   trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java
   trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-809 - fix & testcases

Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2007-02-06 19:02:08 UTC (rev 2196)
@@ -40,6 +40,7 @@
 UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
 SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
 SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?
+SELECT_EXISTS_REF_MESSAGEID=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID = ?
 UPDATE_DELIVERYCOUNT=UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?
 LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -14,6 +14,8 @@
 import org.jboss.jms.client.FailureDetector;
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.HierarchicalState;
@@ -116,33 +118,36 @@
          remotingConnection = fcc.getRemotingConnection();
          return invocation.invokeNext();
       }
-//      catch (CannotConnectException e)
-//      {
-//         log.debug(this + " putting " + methodName + "() on hold until failover completes");
-//
-//         fcc.failureDetected(e, this, remotingConnection);
-//
-//         log.debug(this + " resuming " + methodName + "()");
-//         return invocation.invokeNext();
-//      }
-//      catch (IOException e)
-//      {
-//         log.debug(this + " putting " + methodName + "() on hold until failover completes");
-//
-//         fcc.failureDetected(e, this, remotingConnection);
-//
-//         log.debug(this + " resuming " + methodName + "()");
-//         return invocation.invokeNext();
-//      }
       catch (MessagingNetworkFailureException e)
       {
          log.debug(this + " putting " + methodName + "() on hold until failover completes");
          
-         log.info("********** CAUGHT NETWOEK FAILURE");
+         log.info("********** CAUGHT NETWORK FAILURE");
          
          fcc.failureDetected(e, this, remotingConnection);
          
          log.debug(this + " resuming " + methodName + "()");
+
+         Object target = invocation.getTargetObject();
+         
+         if (methodName.equals("send") &&
+             target instanceof ClientSessionDelegate)
+         {
+            log.debug("#### Capturing send invocation.. setting retry to true");
+            Object[] arguments = ((MethodInvocation)invocation).getArguments();
+            arguments[1] = Boolean.TRUE;
+            ((MethodInvocation)invocation).setArguments(arguments);
+         }
+         else
+         if (methodName.equals("sendTransaction") &&
+             target instanceof ClientConnectionDelegate)
+         {
+            log.debug("#### Capturing sendTransaction invocation.. setting retry to true");
+            Object[] arguments = ((MethodInvocation)invocation).getArguments();
+            arguments[1] = Boolean.TRUE;
+            ((MethodInvocation)invocation).setArguments(arguments);
+         }
+
          return invocation.invokeNext();
       }      
       catch (Throwable e)

Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -250,7 +250,7 @@
               
       // we now invoke the send(Message) method on the session, which will eventually be fielded
       // by connection endpoint
-      ((SessionDelegate)sessionState.getDelegate()).send(messageToSend);
+      ((SessionDelegate)sessionState.getDelegate()).send(messageToSend, false);
       
       return null;
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -206,9 +206,9 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   public void sendTransaction(TransactionRequest request) throws JMSException
+   public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
    {
-      RequestSupport req = new ConnectionSendTransactionRequest(id, version, request);
+      RequestSupport req = new ConnectionSendTransactionRequest(id, version, request, retry);
       
       doInvoke(client, req);
    }

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -430,9 +430,9 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
-   public void send(JBossMessage m) throws JMSException
+   public void send(JBossMessage m, boolean retry) throws JMSException
    {
-      RequestSupport req = new SessionSendRequest(id, version, m);
+      RequestSupport req = new SessionSendRequest(id, version, m, retry);
 
       doInvoke(client, req);
    }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -53,7 +53,7 @@
 
    void stop() throws JMSException;
 
-   void sendTransaction(TransactionRequest request) throws JMSException;
+   void sendTransaction(TransactionRequest request, boolean retry) throws JMSException;
 
    MessagingXid[] getPreparedTransactions() throws JMSException; 
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -418,7 +418,7 @@
       log.trace(this + " closing (noop)");    
    }
 
-   public void sendTransaction(TransactionRequest request) throws JMSException
+   public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
    {    
       try
       {      
@@ -432,7 +432,7 @@
             if (trace) { log.trace(this + " received ONE_PHASE_COMMIT request"); }
             
             Transaction tx = tr.createTransaction();
-            processTransaction(request.getState(), tx);
+            processTransaction(request.getState(), tx, retry);
             tx.commit();
          }        
          else if (request.getRequestType() == TransactionRequest.TWO_PHASE_PREPARE_REQUEST)
@@ -440,7 +440,7 @@
             if (trace) { log.trace(this + " received TWO_PHASE_COMMIT prepare request"); }
             
             Transaction tx = tr.createTransaction(request.getXid());
-            processTransaction(request.getState(), tx);     
+            processTransaction(request.getState(), tx, retry);
             tx.prepare();            
          }
          else if (request.getRequestType() == TransactionRequest.TWO_PHASE_COMMIT_REQUEST)
@@ -607,7 +607,7 @@
       return remotingClientSessionID;
    }
    
-   void sendMessage(JBossMessage msg, Transaction tx) throws Exception
+   void sendMessage(JBossMessage msg, Transaction tx, boolean retry) throws Exception
    {
       JBossDestination dest = (JBossDestination)msg.getJMSDestination();
       
@@ -617,6 +617,15 @@
       // TODO Do we want to set this for ALL messages. Optimisation is possible here.
       msg.setConnectionID(id);
 
+      if (retry)
+      {
+         // Message is already stored... so just ignoring the call
+         if (serverPeer.getPersistenceManagerInstance().referenceExists(msg.getMessageID()))
+         {
+            return;
+         }
+      }
+
       // messages arriving over a failed-over connections will be give preferential treatment by
       // routers, which will send them directly to their corresponding failover queues, not to
       // the "local" queues, to reduce clutter and unnecessary "pull policy" revving.
@@ -695,7 +704,8 @@
       }
    }   
     
-   private void processTransaction(ClientTransaction txState, Transaction tx) throws Throwable
+   private void processTransaction(ClientTransaction txState,
+                                   Transaction tx, boolean retry) throws Throwable
    {
       if (trace) { log.trace(this + " processing transaction " + tx); }
          
@@ -709,7 +719,7 @@
             
             for (Iterator j = sessionState.getMsgs().iterator(); j.hasNext(); )
             {
-               sendMessage((JBossMessage)j.next(), tx);
+               sendMessage((JBossMessage)j.next(), tx, retry);
             }
 
             // send the acks

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -310,11 +310,11 @@
       if (trace) log.trace(this + " closing (noop)");
    }
  
-   public void send(JBossMessage message) throws JMSException
+   public void send(JBossMessage message, boolean retry) throws JMSException
    {
       try
       {       
-         connectionEndpoint.sendMessage(message, null);
+         connectionEndpoint.sendMessage(message, null, retry);
       }
       catch (Throwable t)
       {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -123,7 +123,7 @@
     * @param message The message to send
     * @throws JMSException
     */
-   void send(JBossMessage message) throws JMSException;
+   void send(JBossMessage message, boolean retry) throws JMSException;
    
    /**
     * Send delivery info to the server so the delivery lists can be repopulated. Used only in

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionAdvised.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -94,9 +94,9 @@
       endpoint.stop();
    }
 
-   public void sendTransaction(TransactionRequest request) throws JMSException
+   public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
    {
-      endpoint.sendTransaction(request);
+      endpoint.sendTransaction(request, retry);
    }
 
    public MessagingXid[] getPreparedTransactions() throws JMSException

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -75,9 +75,9 @@
       endpoint.closing();
    }
 
-   public void send(JBossMessage msg) throws JMSException
+   public void send(JBossMessage msg, boolean retry) throws JMSException
    {
-      endpoint.send(msg);
+      endpoint.send(msg, retry);
    }
    
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -195,7 +195,7 @@
       
       try
       {
-         connection.sendTransaction(request);
+         connection.sendTransaction(request, false);
          
          // If we get this far we can remove the transaction
          
@@ -616,7 +616,7 @@
    {
       try
       {
-         connection.sendTransaction(request);
+         connection.sendTransaction(request, false);
       }
       catch (Throwable t)
       {

Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionSendTransactionRequest.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -40,6 +40,7 @@
 public class ConnectionSendTransactionRequest extends RequestSupport
 {
    private TransactionRequest req;
+   private boolean retry;
    
    public ConnectionSendTransactionRequest()
    {      
@@ -47,11 +48,13 @@
    
    public ConnectionSendTransactionRequest(int objectId,
                                            byte version,
-                                           TransactionRequest req)
+                                           TransactionRequest req,
+                                           boolean retry)
    {
       super(objectId, PacketSupport.REQ_CONNECTION_SENDTRANSACTION, version);
       
       this.req = req;
+      this.retry = retry;
    }
 
    public void read(DataInputStream is) throws Exception
@@ -61,6 +64,8 @@
       req = new TransactionRequest();
       
       req.read(is);
+
+      retry = is.readBoolean();
    }
 
    public ResponseSupport serverInvoke() throws Exception
@@ -73,7 +78,7 @@
          throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
       }
       
-      endpoint.sendTransaction(req);
+      endpoint.sendTransaction(req, retry);
       
       return null;
    }
@@ -83,6 +88,8 @@
       super.write(os);
       
       req.write(os);
+
+      os.writeBoolean(retry);
       
       os.flush();
    }

Modified: trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionSendRequest.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -42,6 +42,7 @@
 public class SessionSendRequest extends RequestSupport
 {
    private JBossMessage msg;
+   private boolean retry;
    
    public SessionSendRequest()
    {      
@@ -49,11 +50,12 @@
    
    public SessionSendRequest(int objectId,
                              byte version,
-                             JBossMessage msg)
+                             JBossMessage msg,
+                             boolean retry)
    {
       super(objectId, PacketSupport.REQ_SESSION_SEND, version);
-      
       this.msg = msg;
+      this.retry = retry;
    }
 
    public void read(DataInputStream is) throws Exception
@@ -64,7 +66,9 @@
       
       msg = (JBossMessage)MessageFactory.createMessage(messageType);
 
-      msg.read(is);   
+      msg.read(is);
+
+      retry = is.readBoolean();
    }
 
    public ResponseSupport serverInvoke() throws Exception
@@ -76,9 +80,9 @@
       {
          throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
       }
+
+      endpoint.send(msg, retry);
       
-      endpoint.send(msg);
-      
       return null;
    }
 
@@ -89,6 +93,8 @@
       os.writeByte(msg.getType());
       
       msg.write(os);
+
+      os.writeBoolean(retry);
       
       os.flush();
    }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -1715,17 +1715,17 @@
       PreparedStatement st = null;
       ResultSet rs = null;
       TransactionWrapper wrap = new TransactionWrapper();
-      
+
       try
       {
          conn = ds.getConnection();
-         
+
          st = conn.prepareStatement(getSQLStatement("SELECT_EXISTS_REF"));
          st.setLong(1, channelID);
          st.setLong(2, messageID);
-         
+
          rs = st.executeQuery();
-         
+
          if (rs.next())
          {
             return true;
@@ -1775,7 +1775,73 @@
          wrap.end();
       }
    }
-   
+
+   public boolean referenceExists(long messageID) throws Exception
+   {
+      Connection conn = null;
+      PreparedStatement st = null;
+      ResultSet rs = null;
+      TransactionWrapper wrap = new TransactionWrapper();
+
+      try
+      {
+         conn = ds.getConnection();
+
+         st = conn.prepareStatement(getSQLStatement("SELECT_EXISTS_REF_MESSAGEID"));
+         st.setLong(1, messageID);
+
+         rs = st.executeQuery();
+
+         if (rs.next())
+         {
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }
+      catch (Exception e)
+      {
+         wrap.exceptionOccurred();
+         throw e;
+      }
+      finally
+      {
+         if (rs != null)
+         {
+            try
+            {
+               rs.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         if (st != null)
+         {
+            try
+            {
+               st.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         wrap.end();
+      }
+   }
+
    // Public --------------------------------------------------------
    
    public String toString()
@@ -3330,6 +3396,7 @@
       map.put("UPDATE_RELIABLE_REFS_NOT_PAGED", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?");       
       map.put("SELECT_MIN_MAX_PAGE_ORD", "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?");
       map.put("SELECT_EXISTS_REF", "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
+      map.put("SELECT_EXISTS_REF_MESSAGEID", "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID = ?");
       map.put("UPDATE_DELIVERYCOUNT", "UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?");
       //Message
       map.put("LOAD_MESSAGES",

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -80,7 +80,11 @@
    // Clustering recovery related functionality
    
    boolean referenceExists(long channelID, long messageID) throws Exception;
-     
+
+   // Failover elated functionality (retry on send)
+
+   boolean referenceExists(long messageID) throws Exception;
+
    // Interface value classes
    //---------------------------------------------------------------
    

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -516,7 +516,7 @@
          TransactionRequest tr = new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
          
          RequestSupport req =
-            new ConnectionSendTransactionRequest(23, (byte)77, tr);
+            new ConnectionSendTransactionRequest(23, (byte)77, tr, false);
                  
          testPacket(req, PacketSupport.REQ_CONNECTION_SENDTRANSACTION);                           
       }      
@@ -636,7 +636,7 @@
          JBossMessage msg = new JBossMessage(123);
          
          RequestSupport req =
-            new SessionSendRequest(23, (byte)77, msg);
+            new SessionSendRequest(23, (byte)77, msg, false);
                  
          testPacket(req, PacketSupport.REQ_SESSION_SEND);                           
       }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -1687,6 +1687,79 @@
       failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
    }
 
+   public void testFailureRightAFterSendTransaction() throws Exception
+   {
+      Connection conn = null;
+      Connection conn0 = null;
+
+      try
+      {
+         conn0 = cf.createConnection();
+
+         assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+         conn0.start();
+
+         conn = cf.createConnection();
+
+         assertEquals(1, ((JBossConnection)conn).getServerID());
+
+         // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+         // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+         JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+            getDelegate()).getRemotingConnection();
+         rc.removeConnectionListener();
+
+         // poison the server
+         ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
+
+         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         conn.start();
+
+         MessageProducer producer = session.createProducer(queue[0]);
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         MessageConsumer consumer = session.createConsumer(queue[0]);
+
+         producer.send(session.createTextMessage("before-poison1"));
+         producer.send(session.createTextMessage("before-poison2"));
+         producer.send(session.createTextMessage("before-poison3"));
+         session.commit();
+
+         Thread.sleep(2000);
+
+         for (int i = 1; i <= 3; i++)
+         {
+            TextMessage tm = (TextMessage) consumer.receive(5000);
+
+            assertNotNull(tm);
+
+            assertEquals("before-poison" + i, tm.getText());
+         }
+
+         assertNull(consumer.receive(1000));
+         assertNull(consumer0.receive(5000));
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+      }
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -295,7 +295,7 @@
          return null;
       }
 
-      public void sendTransaction(TransactionRequest request) throws JMSException
+      public void sendTransaction(TransactionRequest request, boolean retry) throws JMSException
       {
       }
 

Modified: trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java	2007-02-06 18:09:22 UTC (rev 2195)
+++ trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java	2007-02-06 19:02:08 UTC (rev 2196)
@@ -48,6 +48,8 @@
 
    public static final int FAIL_SYNCHRONIZED_SEND_RECEIVE = 6;
 
+   public static final int FAIL_AFTER_SENDTRANSACTION = 7;
+
    // Static ---------------------------------------------------------------------------------------
    
    private static int type;
@@ -98,6 +100,14 @@
             
             crash(target);
          }
+         else
+         if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST &&
+             type == FAIL_AFTER_SENDTRANSACTION)
+         {
+            invocation.invokeNext();
+            log.info("#### Crash after sendTransaction");
+            crash(target);
+         }
       }
       else if (target instanceof SessionAdvised && "acknowledgeDelivery".equals(methodName)
                  && type == FAIL_AFTER_ACKNOWLEDGE_DELIVERY)




More information about the jboss-cvs-commits mailing list