[jboss-cvs] JBoss Messaging SVN: r3115 - in trunk/src/main/org/jboss: jms/message and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 18 17:20:45 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-09-18 17:20:45 -0400 (Tue, 18 Sep 2007)
New Revision: 3115
Modified:
trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
trunk/src/main/org/jboss/jms/message/JBossMessage.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1074 and http://jira.jboss.org/jira/browse/JBMESSAGING-1063
Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-09-18 17:05:39 UTC (rev 3114)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java 2007-09-18 21:20:45 UTC (rev 3115)
@@ -89,6 +89,8 @@
boolean keepID = args.length>5? ((Boolean)args[5]).booleanValue() : false;
+ String keptId = null;
+
// configure the message for sending, using attributes stored as metadata
ProducerState producerState = getProducerState(mi);
@@ -219,7 +221,7 @@
if (keepID)
{
- id = ((MessageProxy)m).getMessage().getMessageID();
+ keptId = m.getJMSMessageID();
}
@@ -237,13 +239,15 @@
// Set the new id
- if (!keepID && id == 0l)
+ id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
+ messageToSend.setMessageId(id);
+
+ if (keptId != null)
{
- id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
+ messageToSend.setJMSMessageID(keptId);
}
- messageToSend.setMessageId(id);
-
+
// This only really used for BytesMessages and StreamMessages to reset their state
messageToSend.doBeforeSend();
Modified: trunk/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMessage.java 2007-09-18 17:05:39 UTC (rev 3114)
+++ trunk/src/main/org/jboss/jms/message/JBossMessage.java 2007-09-18 21:20:45 UTC (rev 3115)
@@ -87,6 +87,9 @@
private static final String REPLYTO_HEADER_NAME = "H.REPLYTO";
private static final String CORRELATIONID_HEADER_NAME = "H.CORRELATIONID";
+
+ // When the message is sent through the cluster, it needs to keep the original messageID
+ private static final String JBM_MESSAGE_ID = "JBM_MESSAGE_ID";
private static final String CORRELATIONIDBYTES_HEADER_NAME = "H.CORRELATIONIDBYTES";
@@ -376,8 +379,16 @@
public String getJMSMessageID()
{
if (jmsMessageID == null)
- {
- jmsMessageID = "ID:JBM-" + messageID;
+ {
+ String headerID = (String)headers.get(JBM_MESSAGE_ID);
+ if (headerID == null)
+ {
+ jmsMessageID = "ID:JBM-" + messageID;
+ }
+ else
+ {
+ jmsMessageID = headerID;
+ }
}
return jmsMessageID;
}
@@ -388,6 +399,14 @@
{
throw new JMSException("JMSMessageID must start with ID:");
}
+ if (jmsMessageID == null)
+ {
+ headers.remove(JBM_MESSAGE_ID);
+ }
+ else
+ {
+ headers.put(JBM_MESSAGE_ID, jmsMessageID);
+ }
this.jmsMessageID = jmsMessageID;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-18 17:05:39 UTC (rev 3114)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-09-18 21:20:45 UTC (rev 3115)
@@ -1173,7 +1173,6 @@
{
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
- PreparedStatement psUpdateMessage = null;
Message m = ref.getMessage();
@@ -1191,18 +1190,10 @@
if (!m.isPersisted())
{
// First time so persist the message
- if (isSupportsBlobSelect())
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
- }
- else
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
- }
-
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- rows = storeMessage(m, psInsertMessage, psUpdateMessage);
+ storeMessage(m, psInsertMessage, true);
+ rows = psInsertMessage.executeUpdate();
if (trace) { log.trace("Inserted/updated " + rows + " rows"); }
@@ -1218,7 +1209,6 @@
{
closeStatement(psReference);
closeStatement(psInsertMessage);
- closeStatement(psUpdateMessage);
}
}
}
@@ -1398,7 +1388,6 @@
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
- PreparedStatement psUpdateMessage = null;
PreparedStatement psDeleteReference = null;
List<Message> messagesStored = new ArrayList<Message>();
@@ -1429,22 +1418,15 @@
{
if (psInsertMessage == null)
{
- if (isSupportsBlobSelect())
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
- }
- else
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
- }
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
}
// First time so add message
// And in case of clustered queues/topics, the message could possibly be already persisted on the different node
// so we persist also using the Conditional Update
if (trace) { log.trace("Message does not already exist so inserting it"); }
- rows = storeMessage(m, psInsertMessage, psUpdateMessage);
+ storeMessage(m, psInsertMessage, true);
+ rows = psInsertMessage.executeUpdate();
if (trace) { log.trace("Inserted " + rows + " rows"); }
m.setPersisted(true);
@@ -1489,7 +1471,6 @@
closeStatement(psReference);
closeStatement(psDeleteReference);
closeStatement(psInsertMessage);
- closeStatement(psUpdateMessage);
}
}
}
@@ -1554,7 +1535,6 @@
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
PreparedStatement psUpdateReference = null;
- PreparedStatement psUpdateMessage = null;
List<Message> messagesStored = new ArrayList<Message>();
@@ -1591,18 +1571,11 @@
{
if (psInsertMessage == null)
{
- if (isSupportsBlobSelect())
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
- }
- else
- {
- psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
- }
+ psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
}
-
- rows = storeMessage(m, psInsertMessage, psUpdateMessage);
+
+ storeMessage(m, psInsertMessage, true);
+ rows = psInsertMessage.executeUpdate();
if (trace) { log.trace("Inserted " + rows + " rows"); }
@@ -1650,7 +1623,6 @@
closeStatement(psReference);
closeStatement(psInsertMessage);
closeStatement(psUpdateReference);
- closeStatement(psUpdateMessage);
}
}
}
More information about the jboss-cvs-commits
mailing list