[jboss-cvs] JBoss Messaging SVN: r3115 - in trunk/src/main/org/jboss: jms/message and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Sep 18 17:20:45 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-09-18 17:20:45 -0400 (Tue, 18 Sep 2007)
New Revision: 3115

Modified:
   trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
   trunk/src/main/org/jboss/jms/message/JBossMessage.java
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1074 and http://jira.jboss.org/jira/browse/JBMESSAGING-1063

Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-09-18 17:05:39 UTC (rev 3114)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-09-18 21:20:45 UTC (rev 3115)
@@ -89,6 +89,8 @@
 
       boolean keepID = args.length>5? ((Boolean)args[5]).booleanValue() : false;
 
+      String keptId = null;
+
       // configure the message for sending, using attributes stored as metadata
 
       ProducerState producerState = getProducerState(mi);
@@ -219,7 +221,7 @@
 
          if (keepID)
          {
-            id = ((MessageProxy)m).getMessage().getMessageID();
+            keptId = m.getJMSMessageID();
          }
 
 
@@ -237,13 +239,15 @@
 
       // Set the new id
 
-      if (!keepID && id == 0l)
+      id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
+      messageToSend.setMessageId(id);
+
+      if (keptId != null)
       {
-         id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
+         messageToSend.setJMSMessageID(keptId);
       }
 
-      messageToSend.setMessageId(id);
-      
+
       // This only really used for BytesMessages and StreamMessages to reset their state
       messageToSend.doBeforeSend(); 
       

Modified: trunk/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMessage.java	2007-09-18 17:05:39 UTC (rev 3114)
+++ trunk/src/main/org/jboss/jms/message/JBossMessage.java	2007-09-18 21:20:45 UTC (rev 3115)
@@ -87,6 +87,9 @@
    private static final String REPLYTO_HEADER_NAME = "H.REPLYTO";
    
    private static final String CORRELATIONID_HEADER_NAME = "H.CORRELATIONID";
+
+   // When the message is sent through the cluster, it needs to keep the original messageID
+   private static final String JBM_MESSAGE_ID = "JBM_MESSAGE_ID";
    
    private static final String CORRELATIONIDBYTES_HEADER_NAME = "H.CORRELATIONIDBYTES";
    
@@ -376,8 +379,16 @@
    public String getJMSMessageID()
    {
       if (jmsMessageID == null)
-      {       
-         jmsMessageID = "ID:JBM-" + messageID;
+      {
+         String headerID = (String)headers.get(JBM_MESSAGE_ID);
+         if (headerID == null)
+         {
+            jmsMessageID = "ID:JBM-" + messageID;
+         }
+         else
+         {
+            jmsMessageID = headerID;
+         }
       }
       return jmsMessageID;
    }
@@ -388,6 +399,14 @@
       {
          throw new JMSException("JMSMessageID must start with ID:");
       }
+      if (jmsMessageID == null)
+      {
+         headers.remove(JBM_MESSAGE_ID);
+      }
+      else
+      {
+         headers.put(JBM_MESSAGE_ID, jmsMessageID);
+      }
       this.jmsMessageID = jmsMessageID;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-18 17:05:39 UTC (rev 3114)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-18 21:20:45 UTC (rev 3115)
@@ -1173,7 +1173,6 @@
 			{
 	         PreparedStatement psReference = null;
             PreparedStatement psInsertMessage = null;
-            PreparedStatement psUpdateMessage = null;
 
 	         Message m = ref.getMessage();     
 	         
@@ -1191,18 +1190,10 @@
 	            if (!m.isPersisted())
 	            {
 	               // First time so persist the message
-                  if (isSupportsBlobSelect())
-                  {
-                     psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
-                  }
-                  else
-                  {
-                     psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
-                     psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
-                  }
-                  
+                  psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
 
-                  rows = storeMessage(m, psInsertMessage, psUpdateMessage);
+                  storeMessage(m, psInsertMessage, true);
+                  rows = psInsertMessage.executeUpdate();
 		            
 		            if (trace) { log.trace("Inserted/updated " + rows + " rows"); }     
 		            
@@ -1218,7 +1209,6 @@
 	         {
 	         	closeStatement(psReference);
                closeStatement(psInsertMessage);
-               closeStatement(psUpdateMessage);
 	         }
 			}   		
    	}
@@ -1398,7 +1388,6 @@
 		         
 		      PreparedStatement psReference = null;
 		      PreparedStatement psInsertMessage = null;
-            PreparedStatement psUpdateMessage = null;
             PreparedStatement psDeleteReference = null;
 		      
 		      List<Message> messagesStored = new ArrayList<Message>();
@@ -1429,22 +1418,15 @@
 			            {   
 			            	if (psInsertMessage == null)
 			            	{
-                           if (isSupportsBlobSelect())
-                           {
-                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
-                           }
-                           else
-                           {
-                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
-                              psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
-                           }
+                           psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
 			            	}
 			            	
 			               // First time so add message
                         // And in case of clustered queues/topics, the message could possibly be already persisted on the different node
                         // so we persist also using the Conditional Update
                         if (trace) { log.trace("Message does not already exist so inserting it"); }
-                        rows = storeMessage(m, psInsertMessage, psUpdateMessage);
+                        storeMessage(m, psInsertMessage, true);
+                        rows = psInsertMessage.executeUpdate();
                         if (trace) { log.trace("Inserted " + rows + " rows"); }
 
 			               m.setPersisted(true);
@@ -1489,7 +1471,6 @@
                closeStatement(psReference);
 		      	closeStatement(psDeleteReference);
 		      	closeStatement(psInsertMessage);
-               closeStatement(psUpdateMessage);
 		      }
 			}   		
    	}   	      
@@ -1554,7 +1535,6 @@
 		      PreparedStatement psReference = null;
 		      PreparedStatement psInsertMessage = null;
             PreparedStatement psUpdateReference = null;
-            PreparedStatement psUpdateMessage = null;
 
 		      List<Message> messagesStored = new ArrayList<Message>();
 		 
@@ -1591,18 +1571,11 @@
 			            {	            	
 			            	if (psInsertMessage == null)
 			            	{
-                           if (isSupportsBlobSelect())
-                           {
-                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
-                           }
-                           else
-                           {
-                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
-                              psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
-                           }
+                           psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
 			            	}
-				            		
-				            rows = storeMessage(m, psInsertMessage, psUpdateMessage);               
+
+                        storeMessage(m, psInsertMessage, true);
+                        rows = psInsertMessage.executeUpdate();
 				            
 			            	if (trace) { log.trace("Inserted " + rows + " rows"); }
 
@@ -1650,7 +1623,6 @@
 		      	closeStatement(psReference);
 		      	closeStatement(psInsertMessage);  
 		      	closeStatement(psUpdateReference);
-               closeStatement(psUpdateMessage);
 		      }
 			}   		
    	}




More information about the jboss-cvs-commits mailing list