[jboss-cvs] JBoss Messaging SVN: r8424 - in branches/Branch_1_4: integration/EAP5/etc and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 22 22:20:06 EDT 2011


Author: gaohoward
Date: 2011-08-22 22:20:06 -0400 (Mon, 22 Aug 2011)
New Revision: 8424

Modified:
   branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
   branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java
   branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java
   branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java
   branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java
   branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java
   branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java
   branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
JBMESSAGING-1886


Modified: branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml	2011-08-23 02:20:06 UTC (rev 8424)
@@ -163,6 +163,9 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->send(..))">
       <advice name="handleSend" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->messageChanged(..))">
+      <advice name="handleMessageChanged" aspect="org.jboss.jms.client.container.SessionAspect"/>
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createConsumerDelegate(..))">
       <advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>   

Modified: branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml	2011-08-23 02:20:06 UTC (rev 8424)
@@ -163,6 +163,9 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->send(..))">
       <advice name="handleSend" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->messageChanged(..))">
+      <advice name="handleMessageChanged" aspect="org.jboss.jms.client.container.SessionAspect"/>
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createConsumerDelegate(..))">
       <advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>   

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -163,6 +163,7 @@
             finally
             {            
                state.getClientAckList().clear();
+               state.clearAckMap();
                
                state.setAutoAckInfo(null);
             }
@@ -258,6 +259,7 @@
             }
             
             result = state.addToClientAckList(info);
+            state.getAckMap().put(info.getMessageProxy().getMessage().getMessageID(), info);
                
          }
          // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
@@ -269,12 +271,14 @@
             if (trace) { log.trace(this + " added " + info + " to session state"); }
             
             state.setAutoAckInfo(info);         
+            state.getAckMap().put(info.getMessageProxy().getMessage().getMessageID(), info);
          }
          else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
          {
             if (trace) { log.trace(this + " added to DUPS_OK_ACKNOWLEDGE list delivery " + info); }
             
             state.getClientAckList().add(info);
+            state.getAckMap().put(info.getMessageProxy().getMessage().getMessageID(), info);
             
             //Also set here - this would be used for recovery in a message listener
             state.setAutoAckInfo(info);
@@ -393,6 +397,7 @@
                   finally
                   {                  
                      acks.clear();
+                     state.clearAckMap();
                      state.setAutoAckInfo(null);
                   }
                }    
@@ -482,6 +487,7 @@
             acknowledgeDeliveries(del, state.getClientAckList());
          
             state.getClientAckList().clear();
+            state.clearAckMap();
          }      
            
          return null;
@@ -519,6 +525,7 @@
             List dels = state.getClientAckList();
             
             state.setClientAckList(new ArrayList());
+            state.clearAckMap();
             
             del.redeliver(dels);
    
@@ -797,7 +804,22 @@
       
       return new TextMessageProxy(jbm);
    }      
+   
+   //this message make sure recovered message gets its original contents.
+   public Object handleMessageChanged(Invocation invocation) throws Throwable
+   {
+      MethodInvocation mi = (MethodInvocation)invocation;
+      Long messageId = (Long)mi.getArguments()[0];
       
+      SessionState state = getState(invocation);
+      DeliveryInfo info = state.getAckMap().get(messageId);
+      if (info != null)
+      {
+         info.copyMessage();
+      }
+      return null;
+   }
+      
    public Object handleSetMessageListener(Invocation invocation) throws Throwable
    {
       if (trace) { log.trace("setMessageListener()"); }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -554,6 +554,11 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
+   public void messageChanged(long messageID)
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    // Protected ------------------------------------------------------------------------------------
 
    // Package Private ------------------------------------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -96,6 +96,8 @@
    
    // List<DeliveryInfo>
    private List clientAckList;
+   
+   private Map<Long, DeliveryInfo> ackMap;
 
    private DeliveryInfo autoAckInfo;
    private Map callbackHandlers;
@@ -168,6 +170,8 @@
       }
 
       clientAckList = new ArrayList();
+      
+      ackMap = new HashMap<Long, DeliveryInfo>();
 
       // TODO could optimise this to use the same map of callbackmanagers (which holds refs
       // to callbackhandlers) in the connection, instead of maintaining another map
@@ -356,6 +360,7 @@
                   if (!info.getMessageProxy().getMessage().isReliable())
                   {
                      i.remove();
+                     ackMap.remove(info.getMessageProxy().getMessage().getMessageID());
                      log.trace("removed non persistent delivery " + info);
                   }
                }
@@ -447,6 +452,16 @@
       return clientAckList;
    }
    
+   public Map<Long, DeliveryInfo> getAckMap()
+   {
+      return ackMap;
+   }
+   
+   public void clearAckMap()
+   {
+      ackMap.clear();
+   }
+   
    public boolean addToClientAckList(DeliveryInfo info)
    {
       synchronized (ackLock)
@@ -454,6 +469,8 @@
          if (ackSource == info.getSource())
          {
             clientAckList.add(info);
+            Long messageId = info.getMessageProxy().getMessage().getMessageID();
+            ackMap.put(messageId, info);
             return true;
          }
          else

Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -21,6 +21,8 @@
   */
 package org.jboss.jms.delegate;
 
+import javax.jms.JMSException;
+
 import org.jboss.jms.message.MessageProxy;
 
 /**
@@ -125,6 +127,11 @@
       return msg.getDeliveryId();
    }
 
+   public void copyMessage() throws JMSException
+   {
+      msg = msg.duplicate();
+   }
+
    // Protected -----------------------------------------------------
 
    // Package Private -----------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -97,4 +97,6 @@
    void acknowledgeAll() throws JMSException;
 
    void processMessageTimeout() throws JMSException;
+
+   void messageChanged(long messageID);
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -48,6 +48,11 @@
       super(message);
    }
 
+   public BytesMessageProxy(BytesMessageProxy proxy) throws JMSException
+   {
+      super(proxy);
+   }
+
    public long getBodyLength() throws JMSException
    {
       if (!bodyReadOnly)
@@ -299,4 +304,10 @@
       ((BytesMessage)message).reset();
       bodyReadOnly = true;
    }
+   
+   @Override
+   public MessageProxy duplicate() throws JMSException
+   {
+      return new BytesMessageProxy(this);
+   }
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -49,6 +49,11 @@
       super(message);
    }
    
+   public MapMessageProxy(MapMessageProxy proxy) throws JMSException
+   {
+      super(proxy);
+   }
+
    public boolean getBoolean(String name) throws JMSException
    {
       return ((MapMessage)message).getBoolean(name);
@@ -233,4 +238,10 @@
    {
       return ((MapMessage)message).itemExists(name);
    }
+   
+   @Override
+   public MessageProxy duplicate() throws JMSException
+   {
+      return new MapMessageProxy(this);
+   }
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -138,6 +138,31 @@
 
    // Message implementation ----------------------------------------
 
+   /**
+    * full copy
+    * @throws JMSException 
+    */
+   public MessageProxy(MessageProxy proxy) throws JMSException
+   {
+      this.deliveryId = proxy.deliveryId;
+      
+      this.message = proxy.message.doCopy();
+      
+      this.deliveryCount = proxy.deliveryCount;
+      
+      this.needToCopyHeader = true;
+      
+      this.needToCopyBody = true;
+      
+      this.propertiesReadOnly = true;
+      
+      this.bodyReadOnly = true;
+      
+      this.delegate = proxy.delegate;
+      
+      this.cc = proxy.cc;
+   }
+
    public String getJMSMessageID() throws JMSException
    {
       return message.getJMSMessageID();
@@ -431,6 +456,10 @@
 
    public void clearBody() throws JMSException
    {
+      if (delegate != null)
+      {
+         delegate.messageChanged(message.getMessageID());
+      }
       bodyChange();
       message.clearBody();
       bodyReadOnly = false;
@@ -518,5 +547,10 @@
       return this.source;
    }
 
+   public MessageProxy duplicate() throws JMSException
+   {
+      return new MessageProxy(this);
+   }
+
    // Inner classes -------------------------------------------------   
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -51,6 +51,11 @@
       super(message);
    }
    
+   public ObjectMessageProxy(ObjectMessageProxy proxy) throws JMSException
+   {
+      super(proxy);
+   }
+
    public void setObject(Serializable object) throws JMSException
    {
       if (bodyReadOnly)
@@ -78,5 +83,10 @@
 	   cachedObject = null;
    }
 
+   @Override
+   public MessageProxy duplicate() throws JMSException
+   {
+      return new ObjectMessageProxy(this);
+   }
 
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -48,6 +48,11 @@
       super(message);
    }
 
+   public StreamMessageProxy(StreamMessageProxy proxy) throws JMSException
+   {
+      super(proxy);
+   }
+
    public boolean readBoolean() throws JMSException
    {
       if (!bodyReadOnly)
@@ -237,5 +242,11 @@
       bodyChange();
       ((StreamMessage)message).writeString(value);
    }
+   
+   @Override
+   public MessageProxy duplicate() throws JMSException
+   {
+      return new StreamMessageProxy(this);
+   }
 }
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -46,6 +46,11 @@
       super(message);
    }
    
+   public TextMessageProxy(TextMessageProxy proxy) throws JMSException
+   {
+      super(proxy);
+   }
+
    public void setText(String string) throws JMSException
    {
       if (bodyReadOnly)
@@ -58,4 +63,10 @@
    {
       return ((TextMessage)message).getText();
    }
+   
+   @Override
+   public MessageProxy duplicate() throws JMSException
+   {
+      return new TextMessageProxy(this);
+   }
 }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2011-08-23 02:20:06 UTC (rev 8424)
@@ -21,14 +21,18 @@
   */
 package org.jboss.test.messaging.jms;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Session;
+import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import javax.jms.TopicConnection;
 import javax.jms.TopicPublisher;
@@ -382,8 +386,200 @@
 		}
    }
 
+	//JBMESSAGING-1886
+	public void testMessageBodyClearedBeforeRecover() throws Exception
+   {
+      Connection conn = null;
+      
+      try
+      {     
+         conn = cf.createConnection();
+   
+         Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = producerSess.createProducer(queue1);
+   
+         Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSess.createConsumer(queue1);
+         conn.start();
+   
+         final int NUM_MESSAGES = 10;
+   
+         //1 Send some text messages
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = producerSess.createTextMessage("text-message-to-recover");
+            producer.send(m);
+         }
+   
+         log.trace("Sent text messages");
+   
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage m = (TextMessage)consumer.receive(2000);
+            m.clearBody();
+         }
+   
+         consumerSess.recover();
+   
+         log.trace("Session recover called");
+
+         TextMessage tm = null;
+         for(int i = 0; i < NUM_MESSAGES; i++)
+         {
+            tm = (TextMessage)consumer.receive();
+            assertEquals("text-message-to-recover", tm.getText());
+         }
+         
+         tm.acknowledge();
+         
+         //2 Send some byte messages
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            BytesMessage m = producerSess.createBytesMessage();
+            m.writeBytes(new byte[] {1, 2, 3});
+            producer.send(m);
+         }
+   
+         log.trace("Sent byte messages");
+   
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            BytesMessage m = (BytesMessage)consumer.receive(2000);
+            m.clearBody();
+         }
+   
+         consumerSess.recover();
+   
+         log.trace("Session recover called");
+
+         BytesMessage bm = null;
+         for(int i = 0; i < NUM_MESSAGES; i++)
+         {
+            byte[] buffer = new byte[3];
+            bm = (BytesMessage)consumer.receive();
+            
+            assertEquals(3, bm.getBodyLength());
+            
+            bm.readBytes(buffer);
+            
+            //make sure the buffer is {1,2,3}
+            assertEquals(1, buffer[0]);
+            assertEquals(2, buffer[1]);
+            assertEquals(3, buffer[2]);
+         }
+         
+         bm.acknowledge();
+         
+         //3 Send some Map messages
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            MapMessage m = producerSess.createMapMessage();
+            m.setString("map-message", "hello-world");
+            producer.send(m);
+         }
+   
+         log.trace("Sent map messages");
+   
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            MapMessage m = (MapMessage)consumer.receive(2000);
+            m.clearBody();
+         }
+   
+         consumerSess.recover();
+   
+         log.trace("Session recover called");
+
+         MapMessage mm = null;
+         for(int i = 0; i < NUM_MESSAGES; i++)
+         {
+            mm = (MapMessage)consumer.receive();
+            
+            String value = mm.getString("map-message");
+            
+            assertEquals("hello-world", value);
+         }
+         
+         mm.acknowledge();
+         
+         //4 Send some Object messages
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            ObjectMessage m = producerSess.createObjectMessage(new String("object-message"));
+            producer.send(m);
+         }
+   
+         log.trace("Sent object messages");
+   
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            ObjectMessage m = (ObjectMessage)consumer.receive(2000);
+            m.clearBody();
+         }
+   
+         consumerSess.recover();
+   
+         log.trace("Session recover called");
+
+         ObjectMessage om = null;
+         for(int i = 0; i < NUM_MESSAGES; i++)
+         {
+            om = (ObjectMessage)consumer.receive();
+            
+            String value = (String)om.getObject();
+            
+            assertEquals("object-message", value);
+         }
+         
+         om.acknowledge();
+
+         //5 Send some Stream messages
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            StreamMessage m = producerSess.createStreamMessage();
+            m.writeString("stream-message");
+            producer.send(m);
+         }
+   
+         log.trace("Sent stream messages");
+   
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            StreamMessage m = (StreamMessage)consumer.receive(2000);
+            String v = m.readString();
+            assertEquals("stream-message", v);
+            m.clearBody();
+         }
+   
+         consumerSess.recover();
+   
+         log.trace("Session recover called");
+
+         StreamMessage sm = null;
+         for(int i = 0; i < NUM_MESSAGES; i++)
+         {
+            sm = (StreamMessage)consumer.receive();
+            
+            String value = sm.readString();
+            
+            assertEquals("stream-message", value);
+         }
+         
+         sm.acknowledge();
+         
+         checkEmpty(queue1);
+   
+         conn.close();
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }	
 	
-	
 	/**
 	 * Send some messages, acknowledge them individually and verify they are not resent after
     * recovery.



More information about the jboss-cvs-commits mailing list