[jboss-cvs] JBoss Messaging SVN: r3104 - in trunk: src/main/org/jboss/messaging/core/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 13 11:43:00 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-09-13 11:43:00 -0400 (Thu, 13 Sep 2007)
New Revision: 3104
Modified:
trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1063
Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-09-13 15:38:17 UTC (rev 3103)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-09-13 15:43:00 UTC (rev 3104)
@@ -60,27 +60,27 @@
* $Id$
*/
public class ProducerAspect
-{
+{
// Constants ------------------------------------------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(ProducerAspect.class);
-
+
// Attributes -----------------------------------------------------------------------------------
-
+
private boolean trace = log.isTraceEnabled();
-
+
// Static ---------------------------------------------------------------------------------------
-
+
// Constructors ---------------------------------------------------------------------------------
-
+
// Public ---------------------------------------------------------------------------------------
public Object handleSend(Invocation invocation) throws Throwable
- {
+ {
MethodInvocation mi = (MethodInvocation)invocation;
-
+
Object[] args = mi.getArguments();
-
+
Destination destination = (Destination)args[0];
Message m = (Message)args[1];
int deliveryMode = ((Integer)args[2]).intValue();
@@ -89,13 +89,6 @@
boolean keepID = args.length>5? ((Boolean)args[5]).booleanValue() : false;
- String correlatedMessage = null;
-
- if (keepID)
- {
- correlatedMessage = m.getJMSMessageID();
- }
-
// configure the message for sending, using attributes stored as metadata
ProducerState producerState = getProducerState(mi);
@@ -131,7 +124,7 @@
timeToLive = producerState.getTimeToLive();
if (trace) { log.trace("Using producer's default timeToLive: " + timeToLive); }
}
-
+
if (timeToLive == 0)
{
// Zero implies never expires
@@ -146,7 +139,7 @@
{
// use destination from producer
destination = producerState.getDestination();
-
+
if (destination == null)
{
throw new UnsupportedOperationException("Destination not specified");
@@ -168,15 +161,14 @@
"these destinations must be equal");
}
}
-
+
SessionState sessionState = (SessionState)producerState.getParent();
-
+
// Generate the message id
ConnectionState connectionState = (ConnectionState)sessionState.getParent();
-
- long id =
- connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
+ long id = 0;
+
JBossMessage messageToSend;
boolean foreign = false;
@@ -185,7 +177,7 @@
// it's a foreign message
foreign = true;
-
+
// JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
// a message whose implementation is not one of its own.
@@ -214,37 +206,43 @@
{
messageToSend = new JBossMessage(m, 0);
}
-
+
messageToSend.setJMSMessageID(null);
-
+
//We must set the destination *after* converting from foreign message
- messageToSend.setJMSDestination(destination);
+ messageToSend.setJMSDestination(destination);
}
else
{
// get the actual message
MessageProxy proxy = (MessageProxy)m;
+ if (keepID)
+ {
+ id = ((MessageProxy)m).getMessage().getMessageID();
+ }
+
+
m.setJMSDestination(destination);
-
+
//The following line executed on the proxy should cause a copy to occur
//if it is necessary
proxy.setJMSMessageID(null);
-
+
//Get the underlying message
- messageToSend = proxy.getMessage();
-
+ messageToSend = proxy.getMessage();
+
proxy.beforeSend();
}
-
+
// Set the new id
-
- messageToSend.setMessageId(id);
- if (correlatedMessage != null)
+ if (!keepID && id == 0l)
{
- messageToSend.setJMSCorrelationID(correlatedMessage);
+ id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
}
+
+ messageToSend.setMessageId(id);
// This only really used for BytesMessages and StreamMessages to reset their state
messageToSend.doBeforeSend();
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-13 15:38:17 UTC (rev 3103)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-13 15:43:00 UTC (rev 3104)
@@ -642,33 +642,9 @@
//Maybe we need to persist the message itself
Message m = ref.getMessage();
- storeMessage(m, psInsertMessage, false);
+ rows = storeMessage(m, psInsertMessage, psUpdateMessage);
- if (psUpdateMessage != null)
- {
- psInsertMessage.setLong(7, m.getMessageID());
- rows = psInsertMessage.executeUpdate();
-
- if (rows == 1)
- {
- bindBlobs(m, psUpdateMessage, 1, 2);
- psUpdateMessage.setLong(3, m.getMessageID());
- rows = psUpdateMessage.executeUpdate();
- if (rows != 1)
- {
- throw new IllegalStateException("Couldn't update messageId=" +
- m.getMessageID() + " on paging");
- }
- }
- }
- else
- {
- bindBlobs(m, psInsertMessage, 7, 8);
- psInsertMessage.setLong(9, m.getMessageID());
- rows = psInsertMessage.executeUpdate();
- }
-
if (trace) { log.trace("Inserted " + rows + " rows"); }
}
@@ -680,7 +656,7 @@
closeStatement(psInsertMessage);
closeStatement(psUpdateMessage);
}
- }
+ }
}
new PageReferencesRunner().executeWithRetry();
@@ -1192,8 +1168,9 @@
public Object doTransaction() throws Exception
{
PreparedStatement psReference = null;
- PreparedStatement psMessage = null;
-
+ PreparedStatement psInsertMessage = null;
+ PreparedStatement psUpdateMessage = null;
+
Message m = ref.getMessage();
try
@@ -1210,11 +1187,18 @@
if (!m.isPersisted())
{
// First time so persist the message
- psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-
- storeMessage(m, psMessage, true);
-
- rows = psMessage.executeUpdate();
+ if (isSupportsBlobSelect())
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
+ }
+ else
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
+ }
+
+
+ rows = storeMessage(m, psInsertMessage, psUpdateMessage);
if (trace) { log.trace("Inserted/updated " + rows + " rows"); }
@@ -1229,8 +1213,9 @@
finally
{
closeStatement(psReference);
- closeStatement(psMessage);
- }
+ closeStatement(psInsertMessage);
+ closeStatement(psUpdateMessage);
+ }
}
}
@@ -1409,7 +1394,8 @@
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
- PreparedStatement psDeleteReference = null;
+ PreparedStatement psUpdateMessage = null;
+ PreparedStatement psDeleteReference = null;
List<Message> messagesStored = new ArrayList<Message>();
@@ -1439,16 +1425,24 @@
{
if (psInsertMessage == null)
{
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+ if (isSupportsBlobSelect())
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
+ }
+ else
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
+ }
}
// First time so add message
- storeMessage(m, psInsertMessage, true);
-
- if (trace) { log.trace("Message does not already exist so inserting it"); }
- rows = psInsertMessage.executeUpdate();
- if (trace) { log.trace("Inserted " + rows + " rows"); }
-
+ // And in case of clustered queues/topics, the message could possibly be already persisted on the different node
+ // so we persist also using the Conditional Update
+ if (trace) { log.trace("Message does not already exist so inserting it"); }
+ rows = storeMessage(m, psInsertMessage, psUpdateMessage);
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
+
m.setPersisted(true);
messagesStored.add(m);
@@ -1488,9 +1482,10 @@
}
finally
{
- closeStatement(psReference);
+ closeStatement(psReference);
closeStatement(psDeleteReference);
closeStatement(psInsertMessage);
+ closeStatement(psUpdateMessage);
}
}
}
@@ -1554,8 +1549,9 @@
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
- PreparedStatement psUpdateReference = null;
-
+ PreparedStatement psUpdateReference = null;
+ PreparedStatement psUpdateMessage = null;
+
List<Message> messagesStored = new ArrayList<Message>();
try
@@ -1591,13 +1587,19 @@
{
if (psInsertMessage == null)
{
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+ if (isSupportsBlobSelect())
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
+ }
+ else
+ {
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
+ }
}
- storeMessage(m, psInsertMessage, true);
+ rows = storeMessage(m, psInsertMessage, psUpdateMessage);
- rows = psInsertMessage.executeUpdate();
-
if (trace) { log.trace("Inserted " + rows + " rows"); }
m.setPersisted(true);
@@ -1644,6 +1646,7 @@
closeStatement(psReference);
closeStatement(psInsertMessage);
closeStatement(psUpdateReference);
+ closeStatement(psUpdateMessage);
}
}
}
@@ -1901,6 +1904,40 @@
}
}
+
+ /** Stores the message using the Conditional update */
+ protected int storeMessage(Message message, PreparedStatement psInsertMessage, PreparedStatement psUpdateMessage)
+ throws Exception
+ {
+ int rows;
+ if (psUpdateMessage != null)
+ {
+ storeMessage(message, psInsertMessage, false);
+ psInsertMessage.setLong(7, message.getMessageID());
+ rows = psInsertMessage.executeUpdate();
+
+ if (rows == 1)
+ {
+ bindBlobs(message, psUpdateMessage, 1, 2);
+ psUpdateMessage.setLong(3, message.getMessageID());
+ rows = psUpdateMessage.executeUpdate();
+ if (rows != 1)
+ {
+ throw new IllegalStateException("Couldn't update messageId=" +
+ message.getMessageID() + " on paging");
+ }
+ }
+ }
+ else
+ {
+ storeMessage(message, psInsertMessage, true);
+ psInsertMessage.setLong(9, message.getMessageID());
+ rows = psInsertMessage.executeUpdate();
+ }
+ return rows;
+ }
+
+
private void bindBlobs(Message m, PreparedStatement ps, int headerPosition, int payloadPosition)
throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java 2007-09-13 15:38:17 UTC (rev 3103)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java 2007-09-13 15:43:00 UTC (rev 3104)
@@ -561,8 +561,8 @@
TextMessage tm = (TextMessage)cons2.receive(1000);
assertNotNull(tm);
- assertEquals(messageIdCorrelate[i], tm.getJMSCorrelationID());
-
+ assertEquals(messageIdCorrelate[i], tm.getJMSMessageID());
+
assertEquals("message3-" + i, tm.getText());
}
More information about the jboss-cvs-commits
mailing list