[jboss-cvs] JBoss Messaging SVN: r3104 - in trunk: src/main/org/jboss/messaging/core/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Sep 13 11:43:00 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-09-13 11:43:00 -0400 (Thu, 13 Sep 2007)
New Revision: 3104

Modified:
   trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
   trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1063

Modified: trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-09-13 15:38:17 UTC (rev 3103)
+++ trunk/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-09-13 15:43:00 UTC (rev 3104)
@@ -60,27 +60,27 @@
  * $Id$
  */
 public class ProducerAspect
-{   
+{
    // Constants ------------------------------------------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(ProducerAspect.class);
-   
+
    // Attributes -----------------------------------------------------------------------------------
-   
+
    private boolean trace = log.isTraceEnabled();
-   
+
    // Static ---------------------------------------------------------------------------------------
-   
+
    // Constructors ---------------------------------------------------------------------------------
-   
+
    // Public ---------------------------------------------------------------------------------------
 
    public Object handleSend(Invocation invocation) throws Throwable
-   { 
+   {
       MethodInvocation mi = (MethodInvocation)invocation;
-      
+
       Object[] args = mi.getArguments();
-      
+
       Destination destination = (Destination)args[0];
       Message m = (Message)args[1];
       int deliveryMode = ((Integer)args[2]).intValue();
@@ -89,13 +89,6 @@
 
       boolean keepID = args.length>5? ((Boolean)args[5]).booleanValue() : false;
 
-      String correlatedMessage = null;
-
-      if (keepID)
-      {
-         correlatedMessage = m.getJMSMessageID();
-      }
-
       // configure the message for sending, using attributes stored as metadata
 
       ProducerState producerState = getProducerState(mi);
@@ -131,7 +124,7 @@
          timeToLive = producerState.getTimeToLive();
          if (trace) { log.trace("Using producer's default timeToLive: " + timeToLive); }
       }
-      
+
       if (timeToLive == 0)
       {
          // Zero implies never expires
@@ -146,7 +139,7 @@
       {
          // use destination from producer
          destination = producerState.getDestination();
-         
+
          if (destination == null)
          {
             throw new UnsupportedOperationException("Destination not specified");
@@ -168,15 +161,14 @@
                                                     "these destinations must be equal");
          }
       }
-      
+
       SessionState sessionState = (SessionState)producerState.getParent();
-                  
+
       // Generate the message id
       ConnectionState connectionState = (ConnectionState)sessionState.getParent();
-      
-      long id =
-         connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
 
+      long id = 0;
+
       JBossMessage messageToSend;
       boolean foreign = false;
 
@@ -185,7 +177,7 @@
          // it's a foreign message
 
          foreign = true;
-         
+
          // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
          // a message whose implementation is not one of its own.
 
@@ -214,37 +206,43 @@
          {
             messageToSend = new JBossMessage(m, 0);
          }
-         
+
          messageToSend.setJMSMessageID(null);
-         
+
          //We must set the destination *after* converting from foreign message
-         messageToSend.setJMSDestination(destination);              
+         messageToSend.setJMSDestination(destination);
       }
       else
       {
          // get the actual message
          MessageProxy proxy = (MessageProxy)m;
 
+         if (keepID)
+         {
+            id = ((MessageProxy)m).getMessage().getMessageID();
+         }
+
+
          m.setJMSDestination(destination);
-                                    
+
          //The following line executed on the proxy should cause a copy to occur
          //if it is necessary
          proxy.setJMSMessageID(null);
-         
+
          //Get the underlying message
-         messageToSend = proxy.getMessage();     
-         
+         messageToSend = proxy.getMessage();
+
          proxy.beforeSend();
       }
-          
+
       // Set the new id
-      
-      messageToSend.setMessageId(id);
 
-      if (correlatedMessage != null)
+      if (!keepID && id == 0l)
       {
-         messageToSend.setJMSCorrelationID(correlatedMessage);
+         id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
       }
+
+      messageToSend.setMessageId(id);
       
       // This only really used for BytesMessages and StreamMessages to reset their state
       messageToSend.doBeforeSend(); 

Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-13 15:38:17 UTC (rev 3103)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-09-13 15:43:00 UTC (rev 3104)
@@ -642,33 +642,9 @@
 	         		//Maybe we need to persist the message itself
 	         		Message m = ref.getMessage();
 
-                  storeMessage(m, psInsertMessage, false);
 
+                  rows = storeMessage(m, psInsertMessage, psUpdateMessage);
 
-                  if (psUpdateMessage != null)
-                  {
-                     psInsertMessage.setLong(7, m.getMessageID());
-                     rows = psInsertMessage.executeUpdate();
-
-                     if (rows == 1)
-                     {
-                        bindBlobs(m, psUpdateMessage, 1, 2);
-                        psUpdateMessage.setLong(3, m.getMessageID());
-                        rows = psUpdateMessage.executeUpdate();
-                        if (rows != 1)
-                        {
-                           throw new IllegalStateException("Couldn't update messageId=" +
-                              m.getMessageID() + " on paging");
-                        }
-                     }
-                  }
-                  else
-                  {
-                     bindBlobs(m, psInsertMessage, 7, 8);
-                     psInsertMessage.setLong(9, m.getMessageID());
-                     rows = psInsertMessage.executeUpdate();
-                  }
-
                   if (trace) { log.trace("Inserted " + rows + " rows"); }
 	         	} 
 	         	
@@ -680,7 +656,7 @@
                closeStatement(psInsertMessage);
                closeStatement(psUpdateMessage);
       		}
-   		}      	      	      	
+   		}
       }
    	
    	new PageReferencesRunner().executeWithRetry();   	
@@ -1192,8 +1168,9 @@
 			public Object doTransaction() throws Exception
 			{
 	         PreparedStatement psReference = null;
-	         PreparedStatement psMessage = null;
-	         
+            PreparedStatement psInsertMessage = null;
+            PreparedStatement psUpdateMessage = null;
+
 	         Message m = ref.getMessage();     
 	         
 	         try
@@ -1210,11 +1187,18 @@
 	            if (!m.isPersisted())
 	            {
 	               // First time so persist the message
-	               psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-	               
-	               storeMessage(m, psMessage, true);
-	                                    
-		            rows = psMessage.executeUpdate();
+                  if (isSupportsBlobSelect())
+                  {
+                     psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
+                  }
+                  else
+                  {
+                     psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
+                     psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
+                  }
+                  
+
+                  rows = storeMessage(m, psInsertMessage, psUpdateMessage);
 		            
 		            if (trace) { log.trace("Inserted/updated " + rows + " rows"); }     
 		            
@@ -1229,8 +1213,9 @@
 	         finally
 	         {
 	         	closeStatement(psReference);
-	         	closeStatement(psMessage);
-	         }      
+               closeStatement(psInsertMessage);
+               closeStatement(psUpdateMessage);
+	         }
 			}   		
    	}
    	   	
@@ -1409,7 +1394,8 @@
 		         
 		      PreparedStatement psReference = null;
 		      PreparedStatement psInsertMessage = null;
-		      PreparedStatement psDeleteReference = null;
+            PreparedStatement psUpdateMessage = null;
+            PreparedStatement psDeleteReference = null;
 		      
 		      List<Message> messagesStored = new ArrayList<Message>();
 
@@ -1439,16 +1425,24 @@
 			            {   
 			            	if (psInsertMessage == null)
 			            	{
-			            		psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+                           if (isSupportsBlobSelect())
+                           {
+                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
+                           }
+                           else
+                           {
+                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
+                              psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
+                           }
 			            	}
 			            	
 			               // First time so add message
-			               storeMessage(m, psInsertMessage, true);
-			                 
-		                  if (trace) { log.trace("Message does not already exist so inserting it"); }
-		                  rows = psInsertMessage.executeUpdate();
-		                  if (trace) { log.trace("Inserted " + rows + " rows"); }
-			               
+                        // And in case of clustered queues/topics, the message could possibly be already persisted on the different node
+                        // so we persist also using the Conditional Update
+                        if (trace) { log.trace("Message does not already exist so inserting it"); }
+                        rows = storeMessage(m, psInsertMessage, psUpdateMessage);
+                        if (trace) { log.trace("Inserted " + rows + " rows"); }
+
 			               m.setPersisted(true);
 			               
 			               messagesStored.add(m);
@@ -1488,9 +1482,10 @@
 		      }
 		      finally
 		      {
-		      	closeStatement(psReference);
+               closeStatement(psReference);
 		      	closeStatement(psDeleteReference);
 		      	closeStatement(psInsertMessage);
+               closeStatement(psUpdateMessage);
 		      }
 			}   		
    	}   	      
@@ -1554,8 +1549,9 @@
 		      
 		      PreparedStatement psReference = null;
 		      PreparedStatement psInsertMessage = null;
-		      PreparedStatement psUpdateReference = null;
-		      
+            PreparedStatement psUpdateReference = null;
+            PreparedStatement psUpdateMessage = null;
+
 		      List<Message> messagesStored = new ArrayList<Message>();
 		 
 		      try
@@ -1591,13 +1587,19 @@
 			            {	            	
 			            	if (psInsertMessage == null)
 			            	{
-			            		psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
+                           if (isSupportsBlobSelect())
+                           {
+                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL_FULL"));
+                           }
+                           else
+                           {
+                              psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_CONDITIONAL"));
+                              psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_4CONDITIONAL"));
+                           }
 			            	}
 				            		
-				            storeMessage(m, psInsertMessage, true);               
+				            rows = storeMessage(m, psInsertMessage, psUpdateMessage);               
 				            
-				            rows = psInsertMessage.executeUpdate();
-			
 			            	if (trace) { log.trace("Inserted " + rows + " rows"); }
 
 				            m.setPersisted(true);				   			
@@ -1644,6 +1646,7 @@
 		      	closeStatement(psReference);
 		      	closeStatement(psInsertMessage);  
 		      	closeStatement(psUpdateReference);
+               closeStatement(psUpdateMessage);
 		      }
 			}   		
    	}
@@ -1901,6 +1904,40 @@
       }
    }
 
+
+   /** Stores the message using the Conditional update */
+   protected int storeMessage(Message message, PreparedStatement psInsertMessage, PreparedStatement psUpdateMessage)
+      throws Exception
+   {
+      int rows;
+      if (psUpdateMessage != null)
+      {
+         storeMessage(message, psInsertMessage, false);
+         psInsertMessage.setLong(7, message.getMessageID());
+         rows = psInsertMessage.executeUpdate();
+
+         if (rows == 1)
+         {
+            bindBlobs(message, psUpdateMessage, 1, 2);
+            psUpdateMessage.setLong(3, message.getMessageID());
+            rows = psUpdateMessage.executeUpdate();
+            if (rows != 1)
+            {
+               throw new IllegalStateException("Couldn't update messageId=" +
+                  message.getMessageID() + " on paging");
+            }
+         }
+      }
+      else
+      {
+         storeMessage(message, psInsertMessage, true);
+         psInsertMessage.setLong(9, message.getMessageID());
+         rows = psInsertMessage.executeUpdate();
+      }
+      return rows;
+   }
+
+
    private void bindBlobs(Message m, PreparedStatement ps, int headerPosition, int payloadPosition)
       throws Exception
    {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java	2007-09-13 15:38:17 UTC (rev 3103)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java	2007-09-13 15:43:00 UTC (rev 3104)
@@ -561,8 +561,8 @@
             TextMessage tm = (TextMessage)cons2.receive(1000);
 
             assertNotNull(tm);
-            assertEquals(messageIdCorrelate[i], tm.getJMSCorrelationID());
-            
+            assertEquals(messageIdCorrelate[i], tm.getJMSMessageID());
+
             assertEquals("message3-" + i, tm.getText());
          }                 
 




More information about the jboss-cvs-commits mailing list