[jboss-cvs] JBoss Messaging SVN: r1744 - branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Dec 9 13:53:58 EST 2006
Author: juha at jboss.org
Date: 2006-12-09 13:53:48 -0500 (Sat, 09 Dec 2006)
New Revision: 1744
Modified:
branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
Log:
Rewrite of patch JBMESSAGING-407. Simplifying requirements to PersistenceManager interface and removing unneeded methods.
Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-12-09 18:50:10 UTC (rev 1743)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-12-09 18:53:48 UTC (rev 1744)
@@ -214,15 +214,16 @@
protected String selectAllChannels = "SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE";
//RECOVERY --MK
- protected String selectMessageIdForRef = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+ protected String selectMessageIdForRef = "SELECT MESSAGEID, CHANNELID FROM JMS_MESSAGE_REFERENCE "
+ "WHERE TRANSACTIONID = ? AND STATE = '+' ";
- protected String selectMessageIdForAck = "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE "
+ protected String selectMessageIdForAck = "SELECT MESSAGEID, CHANNELID FROM JMS_MESSAGE_REFERENCE "
+ "WHERE TRANSACTIONID = ? AND STATE = '-' ";
- protected String selectChannelId = "SELECT CHANNELLID FROM JMS_MESSAGE_REFERENCE "
- + "WHERE TRANSACTIONID = ?";
+
+
+
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -252,7 +253,8 @@
protected int minOrdering;
protected Map channelMultipliers;
-
+
+
// Constructors --------------------------------------------------
public JDBCPersistenceManager() throws Exception
@@ -283,6 +285,7 @@
channelMultipliers = new ConcurrentReaderHashMap();
}
+
// ServiceMBeanSupport overrides ---------------------------------
protected void startService() throws Exception
@@ -347,7 +350,8 @@
{
log.debug(this + " stopped");
}
-
+
+
// PersistenceManager implementation -------------------------
public Object getInstance()
@@ -605,117 +609,21 @@
}
}
- public ChannelMapper getChannelMapperReference()
+ public ChannelMapper getChannels()
{
return cm;
}
- public long getMessageIdForRef(long transactionId) throws Exception
+ public Map getMessageChannelPairForTx(long transactionId) throws Exception
{
- return getId(selectMessageIdForRef, transactionId, 1);
+ return getMessageChannelPair(selectMessageIdForRef, transactionId);
}
- public long getMessageIdForAck(long transactionId) throws Exception
+ public List getAcksForTx(long transactionId) throws Exception
{
- return getId(selectMessageIdForAck, transactionId, 1);
+ return new ArrayList(getMessageChannelPair(selectMessageIdForAck, transactionId).keySet());
}
- public long getChannelId(long transactionId) throws Exception
- {
- return getId(selectChannelId, transactionId, 1);
- }
-
- /**
- * This method associates a TransactionCallback to the
- * resurrected transaction. This method is called from
- * the TransactionRepository once a transaction object
- * is resurrected.
- */
- public void associateTxCallbackToPreparedTx(Transaction tx)
- {
- TransactionCallback callback = (TransactionCallback) tx.getKeyedCallback(this);
-
- if (callback == null)
- {
- callback = new TransactionCallback(tx);
-
- tx.addKeyedCallback(callback, this);
- }
- }
-
- /**
- * MK
- * Load the message refs for the corresponding tx id
- */
- public long getId(String sqlQuery, long transactionId, int index) throws Exception
- {
- if (trace) { log.trace("loading references for tx [" + transactionId + "]"); }
-
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- TransactionWrapper wrap = new TransactionWrapper();
- long messageId = 0;
-
- try
- {
- conn = ds.getConnection();
-
- ps = conn.prepareStatement(sqlQuery);
-
- ps.setLong(1, transactionId);
-
- rs = ps.executeQuery();
-
- while(rs.next())
- {
- messageId = rs.getLong(index);
- }
-
- return messageId;
- }
- catch (Exception e)
- {
- wrap.exceptionOccurred();
- throw e;
- }
- finally
- {
- if (rs != null)
- {
- try
- {
- rs.close();
- }
- catch (Throwable e)
- {
- }
- }
- if (ps != null)
- {
- try
- {
- ps.close();
- }
- catch (Throwable e)
- {
- }
- }
- if (conn != null)
- {
- try
- {
- conn.close();
- }
- catch (Throwable e)
- {
- }
- }
- wrap.end();
- }
- }
-
-
/*
* Retrieve a List of messages corresponding to the specified List of message ids.
* The implementation here for HSQLDB does this by using a PreparedStatment with an IN clause
@@ -2602,9 +2510,12 @@
selectAllChannels = sqlProperties.getProperty("SELECT_ALL_CHANNELS", selectAllChannels);
//recovery
- selectMessageIdForRef = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_REFS",selectMessageIdForRef);
- selectMessageIdForAck = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_ACKS",selectMessageIdForAck);
- selectChannelId = sqlProperties.getProperty("SELECT_CHANNEL_ID",selectChannelId );
+
+ // TODO: add props to config files [JPL]
+ selectMessageIdForRef = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_REFS", selectMessageIdForRef);
+ selectMessageIdForAck = sqlProperties.getProperty("SELECT_MESSAGEID_FOR_ACKS", selectMessageIdForAck);
+
+
}
protected TransactionCallback getCallback(Transaction tx)
@@ -4570,7 +4481,94 @@
return tm;
}
-
+
+
+ private Map getMessageChannelPair(String sqlQuery, long transactionId) throws Exception
+ {
+ if (trace) log.trace("loading message and channel ids for tx [" + transactionId + "]");
+
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ TransactionWrapper wrap = new TransactionWrapper();
+
+ try
+ {
+ conn = ds.getConnection();
+
+ ps = conn.prepareStatement(sqlQuery);
+
+ ps.setLong(1, transactionId);
+
+ rs = ps.executeQuery();
+
+ Map messageChannelPair = new HashMap();
+
+
+ while(rs.next())
+ {
+ long messageId = rs.getLong(1);
+ long channelId = rs.getLong(2);
+
+ messageChannelPair.put(new Long(messageId), new Long(channelId));
+
+ if (trace) log.trace("Loaded MsgID: " + messageId + " and ChannelID: " + channelId);
+ }
+
+ List messages = getMessages(new ArrayList(messageChannelPair.keySet()));
+
+ Map returnSet = new HashMap();
+
+ for (Iterator iter = messages.iterator(); iter.hasNext(); )
+ {
+ Message msg = (Message)iter.next();
+ returnSet.put(msg, messageChannelPair.get(new Long(msg.getMessageID())));
+ }
+
+ return returnSet;
+ }
+ catch (Exception e)
+ {
+ wrap.exceptionOccurred();
+ throw e;
+ }
+ finally
+ {
+ if (rs != null)
+ {
+ try
+ {
+ rs.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ if (ps != null)
+ {
+ try
+ {
+ ps.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ if (conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable e)
+ {
+ }
+ }
+ wrap.end();
+ }
+ }
+
+
// Inner classes -------------------------------------------------
private class TransactionWrapper
More information about the jboss-cvs-commits
mailing list