[jboss-cvs] JBoss Messaging SVN: r7865 - in branches/JBMESSAGING-1742: src/main/org/jboss/messaging/core/contract and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 23 08:04:13 EDT 2009
Author: gaohoward
Date: 2009-10-23 08:04:13 -0400 (Fri, 23 Oct 2009)
New Revision: 7865
Modified:
branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Log:
save work
Modified: branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2009-10-23 12:04:13 UTC (rev 7865)
@@ -84,7 +84,7 @@
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
- DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE CHANNEL_ID=?
+ DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2009-10-23 12:04:13 UTC (rev 7865)
@@ -87,6 +87,13 @@
void addTransaction(Transaction tx);
+ //drop all messages that belong to a channel.
+ void dropChannelMessages(long channelID) throws Exception;
+
+ //merge messages from one channel to another.
+ void mergeChannelMessage(long fromID, long toID) throws Exception;
+
+
// Interface value classes ----------------------------------------------------------------------
class MessageChannelPair
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java 2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java 2009-10-23 12:04:13 UTC (rev 7865)
@@ -96,4 +96,6 @@
Delivery handleMove(MessageReference ref, long sourceChannelID);
void setClustered(boolean isClustered);
+
+ void staticMerge(Queue queue);
}
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2009-10-23 12:04:13 UTC (rev 7865)
@@ -1134,6 +1134,169 @@
}
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.contract.PersistenceManager#dropChannelMessages(long)
+ */
+ public void dropChannelMessages(final long channelID) throws Exception
+ {
+ class ChannelDropRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ PreparedStatement ps2 = null;
+ ResultSet rs = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+ ps.setLong(1, channelID);
+ rs = ps.executeQuery();
+ int rows;
+
+ ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+ while (rs.next())
+ {
+ long mid = rs.getLong(1);
+ ps2.setLong(1, mid);
+ ps2.executeUpdate();
+ }
+ ps2.close();
+
+ ps.close();
+
+ ps = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+ ps.setLong(1, channelID);
+ rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update page ord updated " + rows + " rows");
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return null;
+ }
+ }
+
+ new ChannelDropRunner().executeWithRetry();
+ }
+
+ /* (non-Javadoc)
+ * load messages from the channel (fromID) in the DB and
+ * add the messages to the channel (toID)
+ * @see org.jboss.messaging.core.contract.PersistenceManager#mergeChannelMessage(long, long)
+ */
+ public void mergeChannelMessage(final long fromID, final long toID) throws Exception
+ {
+
+ if (fromID == toID) { throw new IllegalArgumentException(
+ "Cannot merge queues - they have the same channel id!!"); }
+
+ class ChannelMergeRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ PreparedStatement ps2 = null;
+
+ try
+ {
+ //first get max page order of toID channel
+ ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
+
+ ps.setLong(1, toID);
+
+ rs = ps.executeQuery();
+
+ rs.next();
+
+ Long maxOrdering = new Long(rs.getLong(2));
+
+ long pageCount = 0;
+
+ if (rs.wasNull())
+ {
+ maxOrdering = null;
+ }
+ else
+ {
+ //If maxOrdering is not null, update the page order
+ pageCount = maxOrdering + 1;
+ }
+
+ rs.close();
+
+ ps.close();
+
+ if (pageCount > 0)
+ {
+ //update paging
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+
+ ps2 = conn
+ .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+ ps.setLong(1, fromID);
+
+ rs = ps.executeQuery();
+
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+
+ ps2.setLong(1, pageCount);
+
+ ps2.setLong(2, msgId);
+
+ ps2.setLong(3, fromID);
+
+ int rows = ps2.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update page ord updated " + rows + " rows");
+ }
+
+ pageCount++;
+ }
+ ps2.close();
+ ps.close();
+ }
+
+ //update channel id
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+ ps.setLong(1, toID);
+
+ ps.setLong(2, fromID);
+
+ int rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update channel id updated " + rows + " rows");
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return null;
+ }
+ }
+
+ new ChannelMergeRunner().executeWithRetry();
+ }
+
public InitialLoadInfo mergeAndLoad(final long fromChannelID,
final long toChannelID, final int numberToLoad,
final long firstPagingOrder, final long nextPagingOrder)
@@ -2668,8 +2831,6 @@
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("DELETE_MESSAGE",
"DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
- map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
- map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE CHANNEL_ID=?");
// Transaction
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2009-10-23 12:04:13 UTC (rev 7865)
@@ -201,6 +201,13 @@
return remoteDistributor;
}
+ public void staticMerge(Queue fromQueue)
+ {
+ if (trace) { log.trace("Merging queue " + channelID + " into " + this +
+ " statically."); }
+ pm.mergeChannelMessage(fromQueue.getChannelID(), channelID);
+ }
+
/**
* Merge the contents of one queue with another - this happens at failover when a queue is failed
* over to another node, but a queue with the same name already exists. In this case we merge the
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-23 12:04:13 UTC (rev 7865)
@@ -783,7 +783,7 @@
}
else
{
- mergeDBChannelMessages(allBindings, b.queue.getChannelID());
+ mergeDBChannelMessages(allBindings, b.queue);
}
}
@@ -814,97 +814,35 @@
{
return;
}
- class RemoveChannelMessages extends JDBCTxRunner<Object>
- {
- public Object doTransaction() throws Exception
- {
- PreparedStatement ps1 = null, ps2 = null;
+ Iterator itBindings = allBindings.iterator();
- try
- {
- ps1 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
- ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
-
- Iterator itBindings = allBindings.iterator();
-
- while (itBindings.hasNext())
- {
- Binding bd = (Binding)itBindings.next();
- long channelID = bd.queue.getChannelID();
-
- ps1.setLong(1, channelID);
-
- int rows1 = ps1.executeUpdate();
-
- ps2.setLong(1, channelID);
-
- int rows2 = ps2.executeUpdate();
-
- if (trace)
- {
- log.trace(rows1 + " rows deleted from channel " + channelID);
- }
- }
-
- return null;
- }
- finally
- {
- closeStatement(ps1);
- closeStatement(ps2);
- }
- }
- }
-
- new RemoveChannelMessages().executeWithRetry();
-
+ while (itBindings.hasNext())
+ {
+ Binding bd = (Binding)itBindings.next();
+ long channelID = bd.queue.getChannelID();
+ pm.dropChannelMessages(channelID);
+ }
}
- private void mergeDBChannelMessages(final Collection allBindings, final long toChannelID) throws Exception
+ private void mergeDBChannelMessages(final Collection allBindings, Queue localQueue) throws Exception
{
if (ds == null)
{
return;
}
- class MergeChannelMessages extends JDBCTxRunner<Object>
+
+ //do a staticMerge
+ Iterator itBindings = allBindings.iterator();
+
+ while (itBindings.hasNext())
{
- public Object doTransaction() throws Exception
+ Binding bd = (Binding)itBindings.next();
+ long fromChannelID = bd.queue.getChannelID();
+ if (fromChannelID != localQueue.getChannelID())
{
- PreparedStatement ps = null;
-
- try
- {
- ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
-
- Iterator itBindings = allBindings.iterator();
-
- ps.setLong(1, toChannelID);
-
- while (itBindings.hasNext())
- {
- Binding bd = (Binding)itBindings.next();
- long fromChannelID = bd.queue.getChannelID();
-
- ps.setLong(2, fromChannelID);
-
- int rows = ps.executeUpdate();
-
- if (trace)
- {
- log.trace(rows + " rows updated from channel " + fromChannelID + " to channel: " + toChannelID);
- }
- }
-
- return null;
- }
- finally
- {
- closeStatement(ps);
- }
+ localQueue.staticMerge(bd.queue);
}
}
-
- new MergeChannelMessages().executeWithRetry();
}
public void injectServerPeer(ServerPeer serverPeer)
Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java 2009-10-23 07:17:50 UTC (rev 7864)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java 2009-10-23 12:04:13 UTC (rev 7865)
@@ -77,6 +77,7 @@
protected void setUp() throws Exception
{
+ super.setUp();
sc = new ServiceContainer("transaction");
//Don't drop the tables again!
@@ -152,7 +153,7 @@
//Restart nodes
for (int i = 0; i < nodeCount; i++)
{
- startDefaultServer(i, overrides, i == 0);
+ startDefaultServer(i, overrides, false);
}
//redeploy
@@ -179,6 +180,11 @@
producer.send(msg);
}
}
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
finally
{
conn.close();
More information about the jboss-cvs-commits
mailing list