[jboss-cvs] JBoss Messaging SVN: r7865 - in branches/JBMESSAGING-1742: src/main/org/jboss/messaging/core/contract and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 23 08:04:13 EDT 2009


Author: gaohoward
Date: 2009-10-23 08:04:13 -0400 (Fri, 23 Oct 2009)
New Revision: 7865

Modified:
   branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Log:
save work


Modified: branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2009-10-23 12:04:13 UTC (rev 7865)
@@ -84,7 +84,7 @@
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
    DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
-   DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE CHANNEL_ID=?
+   DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2009-10-23 12:04:13 UTC (rev 7865)
@@ -87,6 +87,13 @@
 
    void addTransaction(Transaction tx);
 
+   //drop all messages that belong to a channel.
+   void dropChannelMessages(long channelID) throws Exception;
+
+   //merge messages from one channel to another.
+   void mergeChannelMessage(long fromID, long toID) throws Exception;
+
+
    // Interface value classes ----------------------------------------------------------------------
 
    class MessageChannelPair

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java	2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java	2009-10-23 12:04:13 UTC (rev 7865)
@@ -96,4 +96,6 @@
    Delivery handleMove(MessageReference ref, long sourceChannelID);
 
    void setClustered(boolean isClustered);
+
+   void staticMerge(Queue queue);
 }

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-10-23 12:04:13 UTC (rev 7865)
@@ -1134,6 +1134,169 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.contract.PersistenceManager#dropChannelMessages(long)
+    */
+   public void dropChannelMessages(final long channelID) throws Exception
+   {
+      class ChannelDropRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            PreparedStatement ps2 = null;
+            ResultSet rs = null;
+            
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+               ps.setLong(1, channelID);
+               rs = ps.executeQuery();
+               int rows;
+               
+               ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+               while (rs.next())
+               {
+                  long mid = rs.getLong(1);
+                  ps2.setLong(1, mid);
+                  ps2.executeUpdate();
+               }
+               ps2.close();
+               
+               ps.close();
+               
+               ps = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+               ps.setLong(1, channelID);
+               rows = ps.executeUpdate();
+               
+               if (trace)
+               {
+                  log.trace("Update page ord updated " + rows + " rows");
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return null;
+         }
+      }
+      
+      new ChannelDropRunner().executeWithRetry();
+   }
+
+   /* (non-Javadoc)
+    * load messages from the channel (fromID) in the DB and
+    * add the messages to the channel (toID)
+    * @see org.jboss.messaging.core.contract.PersistenceManager#mergeChannelMessage(long, long)
+    */
+   public void mergeChannelMessage(final long fromID, final long toID) throws Exception
+   {
+
+      if (fromID == toID) { throw new IllegalArgumentException(
+            "Cannot merge queues - they have the same channel id!!"); }
+
+      class ChannelMergeRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            ResultSet rs = null;
+            PreparedStatement ps2 = null;
+
+            try
+            {
+               //first get max page order of toID channel
+               ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
+
+               ps.setLong(1, toID);
+
+               rs = ps.executeQuery();
+
+               rs.next();
+
+               Long maxOrdering = new Long(rs.getLong(2));
+
+               long pageCount = 0;
+
+               if (rs.wasNull())
+               {
+                  maxOrdering = null;
+               }
+               else
+               {
+                  //If maxOrdering is not null, update the page order
+                  pageCount = maxOrdering + 1;
+               }
+
+               rs.close();
+
+               ps.close();
+               
+               if (pageCount > 0)
+               {
+                  //update paging
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+                  
+                  ps2 = conn
+                     .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+                  ps.setLong(1, fromID);
+
+                  rs = ps.executeQuery();
+
+                  while (rs.next())
+                  {
+                     long msgId = rs.getLong(1);
+
+                     ps2.setLong(1, pageCount);
+
+                     ps2.setLong(2, msgId);
+
+                     ps2.setLong(3, fromID);
+                     
+                     int rows = ps2.executeUpdate();
+
+                     if (trace)
+                     {
+                        log.trace("Update page ord updated " + rows + " rows");
+                     }
+
+                     pageCount++;
+                  }
+                  ps2.close();
+                  ps.close();
+               }
+               
+               //update channel id
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+               ps.setLong(1, toID);
+
+               ps.setLong(2, fromID);
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Update channel id updated " + rows + " rows");
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return null;
+         }
+      }
+      
+      new ChannelMergeRunner().executeWithRetry();
+   }
+
    public InitialLoadInfo mergeAndLoad(final long fromChannelID,
          final long toChannelID, final int numberToLoad,
          final long firstPagingOrder, final long nextPagingOrder)
@@ -2668,8 +2831,6 @@
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("DELETE_MESSAGE",
               "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
-      map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
-      map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE CHANNEL_ID=?");
 
       
       // Transaction

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2009-10-23 12:04:13 UTC (rev 7865)
@@ -201,6 +201,13 @@
    	return remoteDistributor;
    }
    
+   public void staticMerge(Queue fromQueue)
+   {
+      if (trace) { log.trace("Merging queue " + channelID + " into " + this + 
+                             " statically."); }
+      pm.mergeChannelMessage(fromQueue.getChannelID(), channelID);
+   }
+   
    /**
     * Merge the contents of one queue with another - this happens at failover when a queue is failed
     * over to another node, but a queue with the same name already exists. In this case we merge the

Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-23 12:04:13 UTC (rev 7865)
@@ -783,7 +783,7 @@
          }
          else
          {
-            mergeDBChannelMessages(allBindings, b.queue.getChannelID());
+            mergeDBChannelMessages(allBindings, b.queue);
          }
       }
 
@@ -814,97 +814,35 @@
       {
          return;
       }
-      class RemoveChannelMessages extends JDBCTxRunner<Object>
-      {
-         public Object doTransaction() throws Exception
-         {
-            PreparedStatement ps1 = null, ps2  = null;
+      Iterator itBindings = allBindings.iterator();
 
-            try
-            {
-               ps1 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
-               ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
-               
-               Iterator itBindings = allBindings.iterator();
-
-               while (itBindings.hasNext())
-               {
-                  Binding bd = (Binding)itBindings.next();
-                  long channelID = bd.queue.getChannelID();
-
-                  ps1.setLong(1, channelID);
-
-                  int rows1 = ps1.executeUpdate();
-                  
-                  ps2.setLong(1, channelID);
-                  
-                  int rows2 = ps2.executeUpdate();
-
-                  if (trace)
-                  {
-                     log.trace(rows1 + " rows deleted from channel " + channelID);
-                  }
-               }
-
-               return null;
-            }
-            finally
-            {
-               closeStatement(ps1);
-               closeStatement(ps2);
-            }
-         }
-      }
-
-      new RemoveChannelMessages().executeWithRetry();
-      
+      while (itBindings.hasNext())
+      {
+         Binding bd = (Binding)itBindings.next();
+         long channelID = bd.queue.getChannelID();
+         pm.dropChannelMessages(channelID);
+      }      
    }
    
-   private void mergeDBChannelMessages(final Collection allBindings, final long toChannelID) throws Exception
+   private void mergeDBChannelMessages(final Collection allBindings, Queue localQueue) throws Exception
    {
       if (ds == null)
       {
          return;
       }
-      class MergeChannelMessages extends JDBCTxRunner<Object>
+      
+      //do a staticMerge
+      Iterator itBindings = allBindings.iterator();
+
+      while (itBindings.hasNext())
       {
-         public Object doTransaction() throws Exception
+         Binding bd = (Binding)itBindings.next();
+         long fromChannelID = bd.queue.getChannelID();
+         if (fromChannelID != localQueue.getChannelID())
          {
-            PreparedStatement ps  = null;
-
-            try
-            {
-               ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
-               
-               Iterator itBindings = allBindings.iterator();
-
-               ps.setLong(1, toChannelID);
-
-               while (itBindings.hasNext())
-               {
-                  Binding bd = (Binding)itBindings.next();
-                  long fromChannelID = bd.queue.getChannelID();
-
-                  ps.setLong(2, fromChannelID);
-
-                  int rows = ps.executeUpdate();
-
-                  if (trace)
-                  {
-                     log.trace(rows + " rows updated from channel " + fromChannelID + " to channel: " + toChannelID);
-                  }
-               }
-
-               return null;
-            }
-            finally
-            {
-               closeStatement(ps);
-            }
+            localQueue.staticMerge(bd.queue);
          }
       }
-
-      new MergeChannelMessages().executeWithRetry();      
    }
 
    public void injectServerPeer(ServerPeer serverPeer)

Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-23 12:04:13 UTC (rev 7865)
@@ -77,6 +77,7 @@
    
    protected void setUp() throws Exception
    {
+      super.setUp();
       sc = new ServiceContainer("transaction");
       
       //Don't drop the tables again!
@@ -152,7 +153,7 @@
       //Restart nodes
       for (int i = 0; i < nodeCount; i++)
       {
-         startDefaultServer(i, overrides, i == 0);
+         startDefaultServer(i, overrides, false);
       }
       
       //redeploy
@@ -179,6 +180,11 @@
             producer.send(msg);
          }
       }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
       finally
       {
          conn.close();




More information about the jboss-cvs-commits mailing list