[jboss-cvs] JBoss Messaging SVN: r8424 - in branches/Branch_1_4: integration/EAP5/etc and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 22 22:20:06 EDT 2011
Author: gaohoward
Date: 2011-08-22 22:20:06 -0400 (Mon, 22 Aug 2011)
New Revision: 8424
Modified:
branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java
branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java
branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java
branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java
branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java
branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java
branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
JBMESSAGING-1886
Modified: branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml 2011-08-23 02:20:06 UTC (rev 8424)
@@ -163,6 +163,9 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->send(..))">
<advice name="handleSend" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->messageChanged(..))">
+ <advice name="handleMessageChanged" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createConsumerDelegate(..))">
<advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
Modified: branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml 2011-08-23 02:20:06 UTC (rev 8424)
@@ -163,6 +163,9 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->send(..))">
<advice name="handleSend" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->messageChanged(..))">
+ <advice name="handleMessageChanged" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createConsumerDelegate(..))">
<advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -163,6 +163,7 @@
finally
{
state.getClientAckList().clear();
+ state.clearAckMap();
state.setAutoAckInfo(null);
}
@@ -258,6 +259,7 @@
}
result = state.addToClientAckList(info);
+ state.getAckMap().put(info.getMessageProxy().getMessage().getMessageID(), info);
}
// if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
@@ -269,12 +271,14 @@
if (trace) { log.trace(this + " added " + info + " to session state"); }
state.setAutoAckInfo(info);
+ state.getAckMap().put(info.getMessageProxy().getMessage().getMessageID(), info);
}
else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
if (trace) { log.trace(this + " added to DUPS_OK_ACKNOWLEDGE list delivery " + info); }
state.getClientAckList().add(info);
+ state.getAckMap().put(info.getMessageProxy().getMessage().getMessageID(), info);
//Also set here - this would be used for recovery in a message listener
state.setAutoAckInfo(info);
@@ -393,6 +397,7 @@
finally
{
acks.clear();
+ state.clearAckMap();
state.setAutoAckInfo(null);
}
}
@@ -482,6 +487,7 @@
acknowledgeDeliveries(del, state.getClientAckList());
state.getClientAckList().clear();
+ state.clearAckMap();
}
return null;
@@ -519,6 +525,7 @@
List dels = state.getClientAckList();
state.setClientAckList(new ArrayList());
+ state.clearAckMap();
del.redeliver(dels);
@@ -797,7 +804,22 @@
return new TextMessageProxy(jbm);
}
+
+ //this message make sure recovered message gets its original contents.
+ public Object handleMessageChanged(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+ Long messageId = (Long)mi.getArguments()[0];
+ SessionState state = getState(invocation);
+ DeliveryInfo info = state.getAckMap().get(messageId);
+ if (info != null)
+ {
+ info.copyMessage();
+ }
+ return null;
+ }
+
public Object handleSetMessageListener(Invocation invocation) throws Throwable
{
if (trace) { log.trace("setMessageListener()"); }
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -554,6 +554,11 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ public void messageChanged(long messageID)
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -96,6 +96,8 @@
// List<DeliveryInfo>
private List clientAckList;
+
+ private Map<Long, DeliveryInfo> ackMap;
private DeliveryInfo autoAckInfo;
private Map callbackHandlers;
@@ -168,6 +170,8 @@
}
clientAckList = new ArrayList();
+
+ ackMap = new HashMap<Long, DeliveryInfo>();
// TODO could optimise this to use the same map of callbackmanagers (which holds refs
// to callbackhandlers) in the connection, instead of maintaining another map
@@ -356,6 +360,7 @@
if (!info.getMessageProxy().getMessage().isReliable())
{
i.remove();
+ ackMap.remove(info.getMessageProxy().getMessage().getMessageID());
log.trace("removed non persistent delivery " + info);
}
}
@@ -447,6 +452,16 @@
return clientAckList;
}
+ public Map<Long, DeliveryInfo> getAckMap()
+ {
+ return ackMap;
+ }
+
+ public void clearAckMap()
+ {
+ ackMap.clear();
+ }
+
public boolean addToClientAckList(DeliveryInfo info)
{
synchronized (ackLock)
@@ -454,6 +469,8 @@
if (ackSource == info.getSource())
{
clientAckList.add(info);
+ Long messageId = info.getMessageProxy().getMessage().getMessageID();
+ ackMap.put(messageId, info);
return true;
}
else
Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/DeliveryInfo.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.delegate;
+import javax.jms.JMSException;
+
import org.jboss.jms.message.MessageProxy;
/**
@@ -125,6 +127,11 @@
return msg.getDeliveryId();
}
+ public void copyMessage() throws JMSException
+ {
+ msg = msg.duplicate();
+ }
+
// Protected -----------------------------------------------------
// Package Private -----------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -97,4 +97,6 @@
void acknowledgeAll() throws JMSException;
void processMessageTimeout() throws JMSException;
+
+ void messageChanged(long messageID);
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/BytesMessageProxy.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -48,6 +48,11 @@
super(message);
}
+ public BytesMessageProxy(BytesMessageProxy proxy) throws JMSException
+ {
+ super(proxy);
+ }
+
public long getBodyLength() throws JMSException
{
if (!bodyReadOnly)
@@ -299,4 +304,10 @@
((BytesMessage)message).reset();
bodyReadOnly = true;
}
+
+ @Override
+ public MessageProxy duplicate() throws JMSException
+ {
+ return new BytesMessageProxy(this);
+ }
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/MapMessageProxy.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -49,6 +49,11 @@
super(message);
}
+ public MapMessageProxy(MapMessageProxy proxy) throws JMSException
+ {
+ super(proxy);
+ }
+
public boolean getBoolean(String name) throws JMSException
{
return ((MapMessage)message).getBoolean(name);
@@ -233,4 +238,10 @@
{
return ((MapMessage)message).itemExists(name);
}
+
+ @Override
+ public MessageProxy duplicate() throws JMSException
+ {
+ return new MapMessageProxy(this);
+ }
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/MessageProxy.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -138,6 +138,31 @@
// Message implementation ----------------------------------------
+ /**
+ * full copy
+ * @throws JMSException
+ */
+ public MessageProxy(MessageProxy proxy) throws JMSException
+ {
+ this.deliveryId = proxy.deliveryId;
+
+ this.message = proxy.message.doCopy();
+
+ this.deliveryCount = proxy.deliveryCount;
+
+ this.needToCopyHeader = true;
+
+ this.needToCopyBody = true;
+
+ this.propertiesReadOnly = true;
+
+ this.bodyReadOnly = true;
+
+ this.delegate = proxy.delegate;
+
+ this.cc = proxy.cc;
+ }
+
public String getJMSMessageID() throws JMSException
{
return message.getJMSMessageID();
@@ -431,6 +456,10 @@
public void clearBody() throws JMSException
{
+ if (delegate != null)
+ {
+ delegate.messageChanged(message.getMessageID());
+ }
bodyChange();
message.clearBody();
bodyReadOnly = false;
@@ -518,5 +547,10 @@
return this.source;
}
+ public MessageProxy duplicate() throws JMSException
+ {
+ return new MessageProxy(this);
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/ObjectMessageProxy.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -51,6 +51,11 @@
super(message);
}
+ public ObjectMessageProxy(ObjectMessageProxy proxy) throws JMSException
+ {
+ super(proxy);
+ }
+
public void setObject(Serializable object) throws JMSException
{
if (bodyReadOnly)
@@ -78,5 +83,10 @@
cachedObject = null;
}
+ @Override
+ public MessageProxy duplicate() throws JMSException
+ {
+ return new ObjectMessageProxy(this);
+ }
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/StreamMessageProxy.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -48,6 +48,11 @@
super(message);
}
+ public StreamMessageProxy(StreamMessageProxy proxy) throws JMSException
+ {
+ super(proxy);
+ }
+
public boolean readBoolean() throws JMSException
{
if (!bodyReadOnly)
@@ -237,5 +242,11 @@
bodyChange();
((StreamMessage)message).writeString(value);
}
+
+ @Override
+ public MessageProxy duplicate() throws JMSException
+ {
+ return new StreamMessageProxy(this);
+ }
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/src/main/org/jboss/jms/message/TextMessageProxy.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -46,6 +46,11 @@
super(message);
}
+ public TextMessageProxy(TextMessageProxy proxy) throws JMSException
+ {
+ super(proxy);
+ }
+
public void setText(String string) throws JMSException
{
if (bodyReadOnly)
@@ -58,4 +63,10 @@
{
return ((TextMessage)message).getText();
}
+
+ @Override
+ public MessageProxy duplicate() throws JMSException
+ {
+ return new TextMessageProxy(this);
+ }
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2011-08-17 00:58:55 UTC (rev 8423)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2011-08-23 02:20:06 UTC (rev 8424)
@@ -21,14 +21,18 @@
*/
package org.jboss.test.messaging.jms;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
import javax.jms.Session;
+import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
@@ -382,8 +386,200 @@
}
}
+ //JBMESSAGING-1886
+ public void testMessageBodyClearedBeforeRecover() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 10;
+
+ //1 Send some text messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = producerSess.createTextMessage("text-message-to-recover");
+ producer.send(m);
+ }
+
+ log.trace("Sent text messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = (TextMessage)consumer.receive(2000);
+ m.clearBody();
+ }
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ TextMessage tm = null;
+ for(int i = 0; i < NUM_MESSAGES; i++)
+ {
+ tm = (TextMessage)consumer.receive();
+ assertEquals("text-message-to-recover", tm.getText());
+ }
+
+ tm.acknowledge();
+
+ //2 Send some byte messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ BytesMessage m = producerSess.createBytesMessage();
+ m.writeBytes(new byte[] {1, 2, 3});
+ producer.send(m);
+ }
+
+ log.trace("Sent byte messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ BytesMessage m = (BytesMessage)consumer.receive(2000);
+ m.clearBody();
+ }
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ BytesMessage bm = null;
+ for(int i = 0; i < NUM_MESSAGES; i++)
+ {
+ byte[] buffer = new byte[3];
+ bm = (BytesMessage)consumer.receive();
+
+ assertEquals(3, bm.getBodyLength());
+
+ bm.readBytes(buffer);
+
+ //make sure the buffer is {1,2,3}
+ assertEquals(1, buffer[0]);
+ assertEquals(2, buffer[1]);
+ assertEquals(3, buffer[2]);
+ }
+
+ bm.acknowledge();
+
+ //3 Send some Map messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ MapMessage m = producerSess.createMapMessage();
+ m.setString("map-message", "hello-world");
+ producer.send(m);
+ }
+
+ log.trace("Sent map messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ MapMessage m = (MapMessage)consumer.receive(2000);
+ m.clearBody();
+ }
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ MapMessage mm = null;
+ for(int i = 0; i < NUM_MESSAGES; i++)
+ {
+ mm = (MapMessage)consumer.receive();
+
+ String value = mm.getString("map-message");
+
+ assertEquals("hello-world", value);
+ }
+
+ mm.acknowledge();
+
+ //4 Send some Object messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ObjectMessage m = producerSess.createObjectMessage(new String("object-message"));
+ producer.send(m);
+ }
+
+ log.trace("Sent object messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ObjectMessage m = (ObjectMessage)consumer.receive(2000);
+ m.clearBody();
+ }
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ ObjectMessage om = null;
+ for(int i = 0; i < NUM_MESSAGES; i++)
+ {
+ om = (ObjectMessage)consumer.receive();
+
+ String value = (String)om.getObject();
+
+ assertEquals("object-message", value);
+ }
+
+ om.acknowledge();
+
+ //5 Send some Stream messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ StreamMessage m = producerSess.createStreamMessage();
+ m.writeString("stream-message");
+ producer.send(m);
+ }
+
+ log.trace("Sent stream messages");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ StreamMessage m = (StreamMessage)consumer.receive(2000);
+ String v = m.readString();
+ assertEquals("stream-message", v);
+ m.clearBody();
+ }
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ StreamMessage sm = null;
+ for(int i = 0; i < NUM_MESSAGES; i++)
+ {
+ sm = (StreamMessage)consumer.receive();
+
+ String value = sm.readString();
+
+ assertEquals("stream-message", value);
+ }
+
+ sm.acknowledge();
+
+ checkEmpty(queue1);
+
+ conn.close();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
-
/**
* Send some messages, acknowledge them individually and verify they are not resent after
* recovery.
More information about the jboss-cvs-commits
mailing list