[jboss-cvs] JBoss Messaging SVN: r3238 - in trunk: src/main/org/jboss/jms/client/delegate and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Oct 21 19:29:01 EDT 2007


Author: timfox
Date: 2007-10-21 19:29:00 -0400 (Sun, 21 Oct 2007)
New Revision: 3238

Added:
   trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java
Modified:
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
   trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java
   trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
   trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
Ack tweaks


Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -461,7 +461,12 @@
  
                   sessionDelegate.preDeliver(info);                  
                   
-                  sessionDelegate.postDeliver();                                    
+                  //If post deliver didn't succeed and acknowledgement mode is auto_ack
+                  //That means the ref wasn't acked since it couldn't be found.
+                  //In order to maintain at most once semantics we must therefore not return
+                  //the message
+                  
+                  ignore = !sessionDelegate.postDeliver();                                       
                }
                                              
                if (!ignore)

Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -12,6 +12,7 @@
 import org.jboss.jms.client.FailoverCommandCenter;
 import org.jboss.jms.client.FailoverValve2;
 import org.jboss.jms.client.FailureDetector;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -119,8 +120,8 @@
          // Set retry flag as true on send() and sendTransaction()
          // more details at http://jira.jboss.org/jira/browse/JBMESSAGING-809
 
-         if (invocation.getTargetObject() instanceof ClientSessionDelegate &&
-            (methodName.equals("send") || methodName.equals("sendTransaction")))
+         if ((invocation.getTargetObject() instanceof ClientSessionDelegate && methodName.equals("send")) ||
+         	(invocation.getTargetObject() instanceof ClientConnectionDelegate && methodName.equals("sendTransaction")))
          {
             log.trace(this + " caught " + methodName + "() invocation, enabling check for duplicates");
 

Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -305,6 +305,8 @@
       int ackMode = state.getAcknowledgeMode();
       
       SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
+      
+      boolean res = true;
 
       // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
       // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
@@ -333,7 +335,7 @@
             
             try
             {
-               ackDelivery(sd, delivery);
+               res = ackDelivery(sd, delivery);
             }
             finally
             {
@@ -375,11 +377,10 @@
 
             state.setRecoverCalled(false);
          }
-         state.setAutoAckInfo(null);
-                  
+         state.setAutoAckInfo(null);                  
       }
 
-      return null;
+      return Boolean.valueOf(res);
    }
    
    /**
@@ -829,7 +830,7 @@
       return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
    }
    
-   private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+   private boolean ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
    {
    	if (delivery.isShouldAck())
    	{
@@ -840,8 +841,12 @@
 	      
 	      SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
 	      
-	      sessionToUse.acknowledgeDelivery(delivery);
+	      return sessionToUse.acknowledgeDelivery(delivery);
    	}
+   	else
+   	{
+   		return false;
+   	}
    }
    
    private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -161,11 +161,11 @@
 
    // SessionDelegate implementation ---------------------------------------------------------------
 
-   public void acknowledgeDelivery(Ack ack) throws JMSException
+   public boolean acknowledgeDelivery(Ack ack) throws JMSException
    {
       RequestSupport req = new SessionAcknowledgeDeliveryRequest(id, version, ack);
 
-      doInvoke(client, req);
+      return ((Boolean)doInvoke(client, req)).booleanValue();
    }
 
    public void acknowledgeDeliveries(List acks) throws JMSException
@@ -347,7 +347,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void postDeliver() throws JMSException
+   public boolean postDeliver() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -66,7 +66,7 @@
 
    void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
 
-   void postDeliver() throws JMSException;
+   boolean postDeliver() throws JMSException;
 
    MessageListener getMessageListener() throws JMSException;
 

Modified: trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -73,7 +73,7 @@
     * Acknowledge a delivery
     * @throws JMSException
     */
-   void acknowledgeDelivery(Ack ack) throws JMSException;
+   boolean acknowledgeDelivery(Ack ack) throws JMSException;
    
    /**
     * Cancel a list of deliveries.

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -651,6 +651,16 @@
    {
       if (trace) { log.trace(this + " sending message " + msg + (tx == null ? " non-transactionally" : " in " + tx)); }
 
+      if (checkForDuplicates && msg.isReliable())
+      {
+         // Message is already stored... so just ignoring the call
+         if (serverPeer.getPersistenceManagerInstance().referenceExists(msg.getMessageID()))
+         {
+         	if (trace) { log.trace("Duplicate of " + msg + " exists in database - probably sent before failover"); }
+            return;
+         }
+      }
+      
       JBossDestination dest = (JBossDestination)msg.getJMSDestination();
       
       // This allows the no-local consumers to filter out the messages that come from the same
@@ -659,14 +669,6 @@
       // TODO Do we want to set this for ALL messages. Optimisation is possible here.
       msg.setConnectionID(id);
 
-      if (checkForDuplicates)
-      {
-         // Message is already stored... so just ignoring the call
-         if (serverPeer.getPersistenceManagerInstance().referenceExists(msg.getMessageID()))
-         {
-            return;
-         }
-      }
 
       // We must reference the message *before* we send it the destination to be handled. This is
       // so we can guarantee that the message doesn't disappear from the store before the
@@ -763,10 +765,6 @@
    {
       if (trace) { log.trace(this + " processing transaction " + tx); }
 
-      // used on checkForDuplicates...
-      // we only check the first iteration
-      boolean firstIteration = true;
-        
       for (Iterator i = txState.getSessionStates().iterator(); i.hasNext(); )
       {
          SessionTxState sessionState = (SessionTxState)i.next();
@@ -776,19 +774,8 @@
          for (Iterator j = sessionState.getMsgs().iterator(); j.hasNext(); )
          {
             JBossMessage message = (JBossMessage)j.next();
-            if (checkForDuplicates && firstIteration)
-            {
-               firstIteration = false;
-               if (serverPeer.getPersistenceManagerInstance().
-                  referenceExists(message.getMessageID()))
-               {
-                  // This means the transaction was previously completed...
-                  // we are done here then... no need to even check for ACKs or anything else
-                  log.trace("Transaction " + tx + " was previously completed, ignoring call");
-                  return;
-               }
-            }
-            sendMessage(message, tx, false);
+
+            sendMessage(message, tx, checkForDuplicates);
          }
 
          // send the acks

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -93,7 +93,6 @@
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
 
 /**
@@ -399,7 +398,7 @@
 	      		{
 	      			do
 	      			{
-	      				connectionEndpoint.sendMessage(message, null, false); 
+	      				connectionEndpoint.sendMessage(message, null, checkForDuplicates); 
 	      				
 	         			expectedSequence++;
 	         			
@@ -428,11 +427,11 @@
       }
    }
    
-   public void acknowledgeDelivery(Ack ack) throws JMSException
+   public boolean acknowledgeDelivery(Ack ack) throws JMSException
    {
       try
       {
-         acknowledgeDeliveryInternal(ack);   
+         return acknowledgeDeliveryInternal(ack);   
       }
       catch (Throwable t)
       {
@@ -1740,16 +1739,17 @@
       }
    }
    
-   private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
+   private boolean acknowledgeDeliveryInternal(Ack ack) throws Throwable
    {
       if (trace) { log.trace(this + " acknowledging delivery " + ack); }
-      
+
       DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
       
       if (rec == null)
       {
+      	//This can happen if an ack comes in after failover
          log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
-         return;
+         return false;
       }
       
       rec.del.acknowledge(null);  
@@ -1762,6 +1762,8 @@
       }
       
       if (trace) { log.trace(this + " acknowledged delivery " + ack); }
+      
+      return true;
    }
    
    /* TODO We can combine this with createConsumerDelegateInternal once we move the distinction between queues and topics

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -115,9 +115,9 @@
       endpoint.acknowledgeDeliveries(acks);
    }
    
-   public void acknowledgeDelivery(Ack ack) throws JMSException
+   public boolean acknowledgeDelivery(Ack ack) throws JMSException
    {
-      endpoint.acknowledgeDelivery(ack);
+      return endpoint.acknowledgeDelivery(ack);
    }
 
    public void addTemporaryDestination(JBossDestination destination) throws JMSException

Modified: trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -310,7 +310,7 @@
    	{
    		log.warn("WARNING! POTENTIAL SECURITY RISK. It has been detected that the MessageSucker component " +
    				   "which sucks messages from one node to another has not had its password changed from the installation default. " +
-   				   "Please see the JBoss Messaging userguide for instructions on how to do this.");
+   				   "Please see the JBoss Messaging user guide for instructions on how to do this.");
    	}
    }
 

Modified: trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -156,6 +156,7 @@
    public static final int RESP_SESSION_CREATEBROWSERDELEGATE = 100301;   
    public static final int RESP_SESSION_CREATEQUEUE = 100302;   
    public static final int RESP_SESSION_CREATETOPIC = 100303;
+   public static final int RESP_SESSION_ACKNOWLEDGEDELIVERY = 100304;
    
    // Browser
    // -----------------------
@@ -344,6 +345,10 @@
          case RESP_SESSION_CREATETOPIC:
             packet = new SessionCreateTopicResponse();
             break; 
+         case RESP_SESSION_ACKNOWLEDGEDELIVERY:
+         	packet = new SessionAcknowledgeDeliveryResponse();
+         	break;
+         	
             
          // Browser
          case RESP_BROWSER_NEXTMESSAGE:

Modified: trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -75,9 +75,9 @@
          throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
       }
       
-      endpoint.acknowledgeDelivery(ack);
+      boolean res = endpoint.acknowledgeDelivery(ack);
       
-      return null;
+      return new SessionAcknowledgeDeliveryResponse(res);
    }
 
    public void write(DataOutputStream os) throws Exception

Added: trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.wireformat;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+
+public class SessionAcknowledgeDeliveryResponse extends ResponseSupport
+{
+   private boolean res;
+   
+   public SessionAcknowledgeDeliveryResponse()
+   {      
+   }
+   
+   public SessionAcknowledgeDeliveryResponse(boolean res)
+   {
+      super(PacketSupport.RESP_SESSION_ACKNOWLEDGEDELIVERY);
+      
+      this.res = res;
+   }
+
+   public Object getResponse()
+   {
+      return Boolean.valueOf(res);
+   }
+   
+   public void write(DataOutputStream os) throws Exception
+   {
+      super.write(os);
+      
+      os.writeBoolean(res);
+      
+      os.flush();
+   }
+   
+   public void read(DataInputStream is) throws Exception
+   {
+      res = is.readBoolean();
+   }
+
+}
+

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -118,8 +118,6 @@
    {
       super(ds, tm, sqlProperties, createTablesOnStartup);
       
-      log.info("Set blob " + supportsBlobSelect);
-      
       //usingBatchUpdates is currently ignored due to sketchy support from databases
       
       this.usingBinaryStream = usingBinaryStream;

Modified: trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -188,7 +188,6 @@
    
    public void setSupportsBlobOnSelect(boolean b)
    {
-   	log.info("Calling set blob on select " + b);
    	this.supportsBlobOnSelect = b;
    }
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-10-21 23:29:00 UTC (rev 3238)
@@ -1135,7 +1135,7 @@
             conn.close();
          }
       }
-   }
+ }
 
    public void testFailoverMessageOnServer() throws Exception
    {
@@ -1173,7 +1173,7 @@
             }
             if (event == null)
             {
-               fail("Did not get expected FAILOVER_COMPLETED event");
+               fail("Did not get expected FAILOVER_STARTED event");
             }
          }
 
@@ -1181,8 +1181,12 @@
          // test the client-side failover valve
 
          TextMessage tm = (TextMessage)cons.receive(60000);
+    
          assertNotNull(tm);
          assertEquals("blip", tm.getText());
+         
+         tm = (TextMessage)cons.receive(1000);
+         assertNull(tm);
       }
       finally
       {
@@ -1400,64 +1404,64 @@
       failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
    }
 
-   // This test is commented out until http://jira.jboss.com/jira/browse/JBMESSAGING-604 is complete
-//   public void testFailureRightAfterSendTransaction() throws Exception
-//   {
-//      Connection conn = null;
-// 
-//      try
-//      {
-//         conn = this.createConnectionOnServer(cf, 1);
-//
-//         assertEquals(1, getServerId(conn));
-//
-//         // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
-//         // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
-//         JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
-//            getDelegate()).getRemotingConnection();
-//         rc.removeConnectionListener();
-//
-//         // poison the server
-//         ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
-//
-//         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-//         conn.start();
-//
-//         MessageProducer producer = session.createProducer(queue[0]);
-//
-//         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-//
-//         MessageConsumer consumer = session.createConsumer(queue[0]);
-//
-//         producer.send(session.createTextMessage("before-poison1"));
-//         producer.send(session.createTextMessage("before-poison2"));
-//         producer.send(session.createTextMessage("before-poison3"));
-//         session.commit();
-//
-//         Thread.sleep(2000);
-//
-//         for (int i = 1; i <= 3; i++)
-//         {
-//            TextMessage tm = (TextMessage) consumer.receive(5000);
-//
-//            assertNotNull(tm);
-//
-//            assertEquals("before-poison" + i, tm.getText());
-//         }         
-//
-//         assertNull(consumer.receive(3000));
-//
-//      }
-//      finally
-//      {
-//         if (conn != null)
-//         {
-//            conn.close();
-//         }
-//      }
-//   }
+   public void testFailureRightAfterSendTransaction() throws Exception
+   {
+      Connection conn = null;
+ 
+      try
+      {
+         conn = this.createConnectionOnServer(cf, 1);
 
+         assertEquals(1, getServerId(conn));
+
+         // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+         // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+         JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+            getDelegate()).getRemotingConnection();
+         rc.removeConnectionListener();
+
+         // poison the server
+         ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
+
+         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         conn.start();
+
+         MessageProducer producer = session.createProducer(queue[0]);
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         MessageConsumer consumer = session.createConsumer(queue[0]);
+
+         producer.send(session.createTextMessage("before-poison1"));
+         producer.send(session.createTextMessage("before-poison2"));
+         producer.send(session.createTextMessage("before-poison3"));
+         session.commit();
+
+         Thread.sleep(2000);
+
+         for (int i = 1; i <= 3; i++)
+         {
+            TextMessage tm = (TextMessage) consumer.receive(5000);
+
+            assertNotNull(tm);
+
+            assertEquals("before-poison" + i, tm.getText());
+         }         
+
+         assertNull(consumer.receive(3000));
+         
+         session.commit();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
    public void testCloseConsumer() throws Exception
    {
       Connection conn0 = null;
@@ -1831,9 +1835,17 @@
 
          TextMessage tm = (TextMessage)consumer.receive(5000);
 
-         assertNotNull(tm);
+         if(typeOfFailure == PoisonInterceptor.FAIL_AFTER_ACKNOWLEDGE_DELIVERY)
+         {
+         	//With auto_ack we won't the message - remember auto ack is "at most once"
+         	assertNull(tm);
+         }
+         else
+         {        
+            assertNotNull(tm);
 
-         assertEquals("before-poison", tm.getText());
+            assertEquals("before-poison", tm.getText());
+         }
 
          checkEmpty(queue[1], 0);
       }




More information about the jboss-cvs-commits mailing list