[jboss-cvs] JBoss Messaging SVN: r1744 - branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Dec 9 13:53:58 EST 2006


Author: juha at jboss.org
Date: 2006-12-09 13:53:48 -0500 (Sat, 09 Dec 2006)
New Revision: 1744

Modified:
   branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
Log:
Rewrite of patch JBMESSAGING-407. Simplifying requirements to PersistenceManager interface and removing unneeded methods.

Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-12-09 18:50:10 UTC (rev 1743)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-12-09 18:53:48 UTC (rev 1744)
@@ -214,15 +214,16 @@
    protected String selectAllChannels = "SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE";
    
    //RECOVERY --MK
-   protected String selectMessageIdForRef = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+   protected String selectMessageIdForRef = "SELECT MESSAGEID, CHANNELID FROM JMS_MESSAGE_REFERENCE "
 	      + "WHERE TRANSACTIONID = ? AND STATE = '+' ";
 
-   protected String selectMessageIdForAck = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+   protected String selectMessageIdForAck = "SELECT MESSAGEID, CHANNELID FROM JMS_MESSAGE_REFERENCE "
 	      + "WHERE TRANSACTIONID = ? AND STATE = '-' ";
 
-   protected String selectChannelId = "SELECT CHANNELLID FROM JMS_MESSAGE_REFERENCE "
-	      + "WHERE TRANSACTIONID = ?";
 
+
+
+
    // Static --------------------------------------------------------
    
    // Attributes ----------------------------------------------------
@@ -252,7 +253,8 @@
    protected int minOrdering;
    
    protected Map channelMultipliers;
-   
+
+
    // Constructors --------------------------------------------------
    
    public JDBCPersistenceManager() throws Exception
@@ -283,6 +285,7 @@
       channelMultipliers = new ConcurrentReaderHashMap();
    }
 
+
    // ServiceMBeanSupport overrides ---------------------------------
    
    protected void startService() throws Exception
@@ -347,7 +350,8 @@
    {
       log.debug(this + " stopped");
    }
-   
+
+
    // PersistenceManager implementation -------------------------
    
    public Object getInstance()
@@ -605,117 +609,21 @@
       }
    }
    
-   public ChannelMapper getChannelMapperReference()
+   public ChannelMapper getChannels()
    {
 	   return cm;
    }
 
-   public long getMessageIdForRef(long transactionId) throws Exception
+   public Map getMessageChannelPairForTx(long transactionId) throws Exception
    {
-	   return getId(selectMessageIdForRef, transactionId, 1);
+	   return getMessageChannelPair(selectMessageIdForRef, transactionId);
    }
-   public long getMessageIdForAck(long transactionId) throws Exception
+   public List getAcksForTx(long transactionId) throws Exception
    {
-	   return getId(selectMessageIdForAck, transactionId, 1);
+	   return new ArrayList(getMessageChannelPair(selectMessageIdForAck, transactionId).keySet());
    }
 
-   public long getChannelId(long transactionId) throws Exception
-   {
-	   return getId(selectChannelId, transactionId, 1);
-   }
 
-
-   /**
-    * This method associates a TransactionCallback to the
-    * resurrected transaction. This method is called from
-    * the TransactionRepository once a transaction object
-    * is resurrected.
-    */
-   public void associateTxCallbackToPreparedTx(Transaction tx)
-   {
-	  TransactionCallback callback = (TransactionCallback) tx.getKeyedCallback(this);
-
-      if (callback == null)
-      {
-         callback = new TransactionCallback(tx);
-
-         tx.addKeyedCallback(callback, this);
-      }
-   }
-
-   /**
-    * MK
-    * Load the message refs for the corresponding tx id
-    */
-   public long getId(String sqlQuery, long transactionId, int index) throws Exception
-   {
-      if (trace) { log.trace("loading references for tx [" + transactionId + "]"); }
-
-      Connection conn = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      TransactionWrapper wrap = new TransactionWrapper();
-      long messageId = 0;
-
-      try
-      {
-         conn = ds.getConnection();
-
-         ps = conn.prepareStatement(sqlQuery);
-
-         ps.setLong(1, transactionId);
-
-         rs = ps.executeQuery();
-
-         while(rs.next())
-         {
-        	   messageId = rs.getLong(index);
-         }
-
-         return messageId;
-      }
-      catch (Exception e)
-      {
-         wrap.exceptionOccurred();
-         throw e;
-      }
-      finally
-      {
-         if (rs != null)
-         {
-            try
-            {
-               rs.close();
-            }
-            catch (Throwable e)
-            {
-            }
-         }
-         if (ps != null)
-         {
-            try
-            {
-               ps.close();
-            }
-            catch (Throwable e)
-            {
-            }
-         }
-         if (conn != null)
-         {
-            try
-            {
-               conn.close();
-            }
-            catch (Throwable e)
-            {
-            }
-         }
-         wrap.end();
-      }
-   }
-
-
    /*
     * Retrieve a List of messages corresponding to the specified List of message ids.
     * The implementation here for HSQLDB does this by using a PreparedStatment with an IN clause
@@ -2602,9 +2510,12 @@
       selectAllChannels = sqlProperties.getProperty("SELECT_ALL_CHANNELS", selectAllChannels);
 
       //recovery
-      selectMessageIdForRef = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_REFS",selectMessageIdForRef);
-      selectMessageIdForAck = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_ACKS",selectMessageIdForAck);
-      selectChannelId  = sqlProperties.getProperty("SELECT_CHANNEL_ID",selectChannelId );
+
+      // TODO: add props to config files [JPL]
+      selectMessageIdForRef = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_REFS", selectMessageIdForRef);
+      selectMessageIdForAck = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_ACKS", selectMessageIdForAck);
+
+
    }
    
    protected TransactionCallback getCallback(Transaction tx)
@@ -4570,7 +4481,94 @@
 
       return tm;
    }
-   
+
+
+   private Map getMessageChannelPair(String sqlQuery, long transactionId) throws Exception
+   {
+      if (trace) log.trace("loading message and channel ids for tx [" + transactionId + "]");
+
+      Connection conn = null;
+      PreparedStatement ps = null;
+      ResultSet rs = null;
+      TransactionWrapper wrap = new TransactionWrapper();
+
+      try
+      {
+         conn = ds.getConnection();
+
+         ps = conn.prepareStatement(sqlQuery);
+
+         ps.setLong(1, transactionId);
+
+         rs = ps.executeQuery();
+
+         Map messageChannelPair = new HashMap();
+
+
+         while(rs.next())
+         {
+        	   long messageId = rs.getLong(1);
+            long channelId = rs.getLong(2);
+
+            messageChannelPair.put(new Long(messageId), new Long(channelId));
+
+            if (trace) log.trace("Loaded MsgID: " + messageId + " and ChannelID: " + channelId);
+         }
+
+         List messages = getMessages(new ArrayList(messageChannelPair.keySet()));
+
+         Map returnSet = new HashMap();
+
+         for (Iterator iter = messages.iterator(); iter.hasNext(); )
+         {
+            Message msg = (Message)iter.next();
+            returnSet.put(msg, messageChannelPair.get(new Long(msg.getMessageID())));
+         }
+
+         return returnSet;
+      }
+      catch (Exception e)
+      {
+         wrap.exceptionOccurred();
+         throw e;
+      }
+      finally
+      {
+         if (rs != null)
+         {
+            try
+            {
+               rs.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         if (ps != null)
+         {
+            try
+            {
+               ps.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Throwable e)
+            {
+            }
+         }
+         wrap.end();
+      }
+   }
+
+
    // Inner classes -------------------------------------------------
    
    private class TransactionWrapper




More information about the jboss-cvs-commits mailing list