[jboss-cvs] JBoss Messaging SVN: r7860 - in branches/JBMESSAGING-1742: src/main/org/jboss/jms/server/destination and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 21 06:01:55 EDT 2009


Author: gaohoward
Date: 2009-10-21 06:01:54 -0400 (Wed, 21 Oct 2009)
New Revision: 7860

Modified:
   branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java
   branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java
   branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
Log:
first commit


Modified: branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2009-10-21 10:01:54 UTC (rev 7860)
@@ -83,6 +83,8 @@
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
+   DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
+   DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE CHANNEL_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -632,6 +632,16 @@
 
    protected abstract boolean isQueue();
 
+   public void setDropOldMessageOnRedeploy(boolean dropOldMessageOnRedeploy)
+   {
+      destination.setDropOldMessageOnRedeploy(dropOldMessageOnRedeploy);
+   }
+
+   public boolean isDropOldMessageOnRedeploy()
+   {
+      return destination.isDropOldMessageOnRedeploy();
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -90,6 +90,8 @@
    protected int messageCounterHistoryDayLimit = -1;
    
    protected int maxDeliveryAttempts = -1;
+   
+   protected boolean dropOldMessageOnRedeploy;
     
    public ManagedDestination()
    {      
@@ -318,4 +320,14 @@
    {   
       //NOOP
    }
+
+   public void setDropOldMessageOnRedeploy(boolean drop)
+   {
+      dropOldMessageOnRedeploy = drop;
+   }
+
+   public boolean isDropOldMessageOnRedeploy()
+   {
+      return dropOldMessageOnRedeploy;
+   }
 }

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -87,20 +87,23 @@
          	
          	//Sanity check - currently it is not possible to change the clustered attribute of a destination
          	//See http://jira.jboss.org/jira/browse/JBMESSAGING-1235
+            //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+            if (po.isClustered())
+            {
+               if (destination.isClustered() != queue.isClustered())
+               {
+                  
+                  log.warn("Queue " + destination.getName() 
+                           + " previous clustered attribute is " + queue.isClustered() 
+                           + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+
+                  queue = po.convertDestination(destination);
+               }
+            }
          	
-         	boolean actuallyClustered = po.isClustered() && destination.isClustered();
-         	
-         	if (actuallyClustered != queue.isClustered())
-         	{
-         		throw new IllegalArgumentException("Queue " + destination.getName() + " is already deployed as clustered = " +
-         				                             queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
-         				                             " . You must delete the destination first before redeploying");
-         		
-         	}
-         	
             queue.setPagingParams(destination.getFullSize(),
-                                  destination.getPageSize(),
-                                  destination.getDownCacheSize());  
+                               destination.getPageSize(),
+                               destination.getDownCacheSize());  
             
             queue.load();
                

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -75,15 +75,17 @@
          {
             Queue queue = (Queue)iter.next();
             
-            boolean actuallyClustered = po.isClustered() && destination.isClustered();
-            
-            if (actuallyClustered != queue.isClustered())
-         	{
-         		throw new IllegalArgumentException("Topic " + destination.getName() + " is already deployed as clustered = " +
-         				                             queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
-         				                             " . You must delete the destination first before redeploying");
-         		
-         	}
+            //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+            if (po.isClustered())
+            {
+               if (destination.isClustered() != queue.isClustered())
+               {
+                  log.warn("Topic " + destination.getName() 
+                           + " previous clustered attribute is " + queue.isClustered() 
+                           + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+                  queue = po.convertDestination(destination);
+               }
+            }
                      
             //TODO We need to set the paging params this way since the post office doesn't store them
             //instead we should never create queues inside the postoffice - only do it at deploy time

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.jboss.jms.server.destination.ManagedDestination;
 import org.jboss.messaging.core.impl.tx.Transaction;
 
 /**
@@ -155,5 +156,10 @@
 	Map getRecoveryArea(String queueName);
    
    int getRecoveryMapSize(String queueName);
+
+   /**
+    * change the destination's clustered state.
+    */
+   Queue convertDestination(ManagedDestination destination) throws Throwable;
 }
 

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -94,4 +94,6 @@
 
    //Optimisation for shared database
    Delivery handleMove(MessageReference ref, long sourceChannelID);
+
+   void setClustered(boolean isClustered);
 }

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -2668,6 +2668,10 @@
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("DELETE_MESSAGE",
               "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+      map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
+      map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE CHANNEL_ID=?");
+
+      
       // Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -708,4 +708,9 @@
 			}
 		}   	
    }
+
+   public void setClustered(boolean isClustered)
+   {
+      clustered = isClustered;
+   }
 }

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -49,7 +49,9 @@
 import javax.transaction.TransactionManager;
 
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.server.JMSCondition;
 import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.destination.ManagedDestination;
 import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
@@ -484,7 +486,7 @@
    	
    	return added;
    }
-                
+
    public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
    {
    	Binding binding = internalRemoveBinding(queueName, allNodes, true);
@@ -755,7 +757,157 @@
 	   }
 	}
 	
-	public void injectServerPeer(ServerPeer serverPeer)
+	/*
+	 * Convert an existing destination: 
+	 * if it is clustered, change it to be non-clustered.
+	 * if it is non-clustered, change it to be clustered.
+	 * 
+	 * Note: this method should be called during the destination loading time, i.e.
+	 * the queue is not activated yet. Only applicable to clustered post office.
+	 */
+   public Queue convertDestination(ManagedDestination newDest) throws Throwable
+   {
+      Binding b = getBindingForQueueName(newDest.getName());
+      if (b.queue.isActive())
+      {
+         throw new IllegalStateException(this + " cannot convert the destination " + b.queue + " because it is in active state");
+      }
+      
+      //if a destination changes from clustered to standalone, collect the messages from all channels
+      if ( !newDest.isClustered() )
+      {
+         Collection allBindings = getAllBindingsForQueueName(newDest.getName());
+         if (newDest.isDropOldMessageOnRedeploy())
+         {
+            removeDBChannelMessages(allBindings);
+         }
+         else
+         {
+            mergeDBChannelMessages(allBindings, b.queue.getChannelID());
+         }
+      }
+
+      //if the old destination is clustered, we need to remove from all nodes (queue and durable subs)
+      b = removeBinding(newDest.getName(), !newDest.isClustered());
+
+      b.queue.setClustered(newDest.isClustered());
+      
+      if (newDest.isQueue())
+      {
+         JMSCondition queueCond = new JMSCondition(true, newDest.getName());
+         addBinding(new Binding(queueCond, b.queue, false), false);
+      }
+      else
+      {
+         b.queue.setClustered(newDest.isClustered());
+
+         JMSCondition queueCond = new JMSCondition(false, newDest.getName());
+         addBinding(new Binding(queueCond, b.queue, newDest.isClustered()), newDest.isClustered());
+      }
+      
+      return b.queue;
+   }
+   
+   private void removeDBChannelMessages(final Collection allBindings) throws Exception
+   {
+      if (ds == null)
+      {
+         return;
+      }
+      class RemoveChannelMessages extends JDBCTxRunner<Object>
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps1 = null, ps2  = null;
+
+            try
+            {
+               ps1 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+               ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+               
+               Iterator itBindings = allBindings.iterator();
+
+               while (itBindings.hasNext())
+               {
+                  Binding bd = (Binding)itBindings.next();
+                  long channelID = bd.queue.getChannelID();
+
+                  ps1.setLong(1, channelID);
+
+                  int rows1 = ps1.executeUpdate();
+                  
+                  ps2.setLong(1, channelID);
+                  
+                  int rows2 = ps2.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace(rows1 + " rows deleted from channel " + channelID);
+                  }
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(ps1);
+               closeStatement(ps2);
+            }
+         }
+      }
+
+      new RemoveChannelMessages().executeWithRetry();
+      
+   }
+   
+   private void mergeDBChannelMessages(final Collection allBindings, final long toChannelID) throws Exception
+   {
+      if (ds == null)
+      {
+         return;
+      }
+      class MergeChannelMessages extends JDBCTxRunner<Object>
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps  = null;
+
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+               
+               Iterator itBindings = allBindings.iterator();
+
+               ps.setLong(1, toChannelID);
+
+               while (itBindings.hasNext())
+               {
+                  Binding bd = (Binding)itBindings.next();
+                  long fromChannelID = bd.queue.getChannelID();
+
+                  ps.setLong(2, fromChannelID);
+
+                  int rows = ps.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace(rows + " rows updated from channel " + fromChannelID + " to channel: " + toChannelID);
+                  }
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+         }
+      }
+
+      new MergeChannelMessages().executeWithRetry();      
+   }
+
+   public void injectServerPeer(ServerPeer serverPeer)
 	{
 		this.serverPeer = serverPeer;
 	}
@@ -2620,7 +2772,6 @@
       groupMember.unicastData(request, address);
    }
    
-   
    private Map getBindingsFromStorage() throws Exception
    {
    	if (ds == null)

Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2009-10-21 10:01:54 UTC (rev 7860)
@@ -392,6 +392,10 @@
       {
          return null;
       }
+
+      public void setClustered(boolean isClustered)
+      {
+      }
    }
 
    // Inner classes -------------------------------------------------




More information about the jboss-cvs-commits mailing list