[jboss-cvs] JBoss Messaging SVN: r8614 - in branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss: messaging/core/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 20 02:20:53 EDT 2013
Author: bershath27
Date: 2013-09-20 02:20:53 -0400 (Fri, 20 Sep 2013)
New Revision: 8614
Modified:
branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
Log:
JBPAPP-10865
Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java 2013-09-20 05:04:30 UTC (rev 8613)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/jms/server/ServerPeer.java 2013-09-20 06:20:53 UTC (rev 8614)
@@ -274,6 +274,7 @@
{
((JDBCPersistenceManager)persistenceManager).injectNodeID(serverPeerID);
((JDBCPersistenceManager)persistenceManager).setStopServerPeerOnDBFailure(this.stopServerPeerOnDBFailure);
+ ((JDBCPersistenceManager)persistenceManager).setServerPeer(this);
}
else if (persistenceManager instanceof NullPersistenceManager)
{
Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2013-09-20 05:04:30 UTC (rev 8613)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2013-09-20 06:20:53 UTC (rev 8614)
@@ -22,11 +22,14 @@
package org.jboss.messaging.core.impl;
import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.tx.MessagingXid;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.impl.message.MessageFactory;
import org.jboss.messaging.core.impl.message.MessageSupport;
import org.jboss.messaging.core.impl.tx.PreparedTxInfo;
@@ -106,16 +109,12 @@
private boolean supportsTxAge;
- private int maxRetry = 25;
-
- private int retryInterval = 1000;
-
- private boolean retryOnConnectionFailure = false;
-
private volatile boolean stopServerPeerOnDBFailure = false;
private Object moveLock = new Object();
+ private ServerPeer server;
+
// Constructors --------------------------------------------------
public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -1192,6 +1191,14 @@
public Object doTransaction() throws Exception
{
PreparedStatement statement = null;
+ PreparedStatement statement1 = null;
+ PreparedStatement statement2 = null;
+ PreparedStatement statement3 = null;
+ PreparedStatement statement4 = null;
+ ResultSet rs = null;
+ ResultSet rs1 = null;
+ ResultSet rs2 = null;
+
try
{
statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
@@ -1199,14 +1206,106 @@
statement.setInt(2, fromNodeID);
int affected = statement.executeUpdate();
+ statement.close();
+ statement = null;
+
log.debug("Merged " + affected + " transactions from channel "
+ fromNodeID + " into node " + toNodeID);
+
+ //The following code is for JBMESSAGING-1940
+ if (affected > 0)
+ {
+ //check non-movable transactions
+ statement1 = conn.prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
+ statement1.setInt(1, nodeID);
+
+ rs = statement1.executeQuery();
+
+ List<Long> nonMovableTxs = new ArrayList<Long>();
+
+ //check '+" messages, if binding is missing, don't merge this tx
+ String sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_REF");
+ statement2 = conn.prepareStatement(sql);
+
+ //check '-' messages, if binding is missing, don't merge this tx
+ sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_ACK");
+ statement3 = conn.prepareStatement(sql);
+
+ PostOffice postOffice = server.getPostOfficeInstance();
+
+ while (rs.next())
+ {
+ boolean found = false;
+
+ long txId = rs.getLong(1);
+
+ statement2.setLong(1, txId);
+
+ rs1 = statement2.executeQuery();
+
+ while (rs1.next())
+ {
+ long channelId = rs1.getLong(2);
+ Binding binding = postOffice.getBindingForChannelID(channelId);
+ if (binding == null)
+ {
+ nonMovableTxs.add(txId);
+ found = true;
+ break;
+ }
+ }
+
+ rs1.close();
+ rs1 = null;
+ if (found) continue;
+
+ statement3.setLong(1, txId);
+
+ rs2 = statement3.executeQuery();
+
+ while (rs2.next())
+ {
+ long channelId = rs2.getLong(2);
+ Binding binding = postOffice.getBindingForChannelID(channelId);
+ if (binding == null)
+ {
+ nonMovableTxs.add(txId);
+ break;
+ }
+ }
+ rs2.close();
+ rs2 = null;
+ }
+
+ rs.close();
+ rs = null;
+ statement2.close();
+ statement2 = null;
+ statement3.close();
+ statement3 = null;
+
+ if (nonMovableTxs.size() > 0)
+ {
+ //put non-movable tx back
+ statement4 = conn.prepareStatement(getSQLStatement("MOVE_TX_NODE"));
+ for (Long nonMTxId : nonMovableTxs)
+ {
+ statement4.setInt(1, fromNodeID);
+ statement4.setLong(2, nonMTxId);
+ statement4.executeUpdate();
+ }
+ statement4.close();
+ statement4 = null;
+ log.debug("Moved transactions back: " + nonMovableTxs);
+ }
+ }
return null;
}
finally
{
- closeStatement(statement);
+ closeResultSet(rs, rs1, rs2);
+ closeStatement(statement, statement1, statement2, statement3, statement4);
}
}
}
@@ -3091,6 +3190,7 @@
map.put("SELECT_MESSAGE_ID_FOR_ACK",
"SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD");
map.put("UPDATE_TX", "UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?");
+ map.put("MOVE_TX_NODE", "UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?");
// Counter
map.put("UPDATE_COUNTER",
@@ -3613,4 +3713,9 @@
this.stopServerPeerOnDBFailure = crashServerOnDBFailure;
}
+ public void setServerPeer(ServerPeer server)
+ {
+ this.server = server;
+ }
+
}
Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2013-09-20 05:04:30 UTC (rev 8613)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2013-09-20 06:20:53 UTC (rev 8614)
@@ -252,39 +252,45 @@
return false;
}
- protected void closeResultSet(ResultSet rs)
+ protected void closeResultSet(ResultSet... rss)
{
- if (rs != null)
+ for (ResultSet rs : rss)
{
- try
+ if (rs != null)
{
- rs.close();
+ try
+ {
+ rs.close();
+ }
+ catch (Throwable e)
+ {
+ if (trace)
+ {
+ log.trace("Failed to close result set", e);
+ }
+ }
}
- catch (Throwable e)
- {
- if (trace)
- {
- log.trace("Failed to close result set", e);
- }
- }
}
}
- protected void closeStatement(Statement st)
+ protected void closeStatement(Statement... sts)
{
- if (st != null)
+ for (Statement st : sts)
{
- try
+ if (st != null)
{
- st.close();
+ try
+ {
+ st.close();
+ }
+ catch (Throwable e)
+ {
+ if (trace)
+ {
+ log.trace("Failed to close statement", e);
+ }
+ }
}
- catch (Throwable e)
- {
- if (trace)
- {
- log.trace("Failed to close statement", e);
- }
- }
}
}
More information about the jboss-cvs-commits
mailing list