[jboss-cvs] JBoss Messaging SVN: r7860 - in branches/JBMESSAGING-1742: src/main/org/jboss/jms/server/destination and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 21 06:01:55 EDT 2009
Author: gaohoward
Date: 2009-10-21 06:01:54 -0400 (Wed, 21 Oct 2009)
New Revision: 7860
Modified:
branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java
branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java
branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
Log:
first commit
Modified: branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2009-10-21 10:01:54 UTC (rev 7860)
@@ -83,6 +83,8 @@
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
+ DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
+ DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE CHANNEL_ID=?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -632,6 +632,16 @@
protected abstract boolean isQueue();
+ public void setDropOldMessageOnRedeploy(boolean dropOldMessageOnRedeploy)
+ {
+ destination.setDropOldMessageOnRedeploy(dropOldMessageOnRedeploy);
+ }
+
+ public boolean isDropOldMessageOnRedeploy()
+ {
+ return destination.isDropOldMessageOnRedeploy();
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -90,6 +90,8 @@
protected int messageCounterHistoryDayLimit = -1;
protected int maxDeliveryAttempts = -1;
+
+ protected boolean dropOldMessageOnRedeploy;
public ManagedDestination()
{
@@ -318,4 +320,14 @@
{
//NOOP
}
+
+ public void setDropOldMessageOnRedeploy(boolean drop)
+ {
+ dropOldMessageOnRedeploy = drop;
+ }
+
+ public boolean isDropOldMessageOnRedeploy()
+ {
+ return dropOldMessageOnRedeploy;
+ }
}
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/QueueService.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -87,20 +87,23 @@
//Sanity check - currently it is not possible to change the clustered attribute of a destination
//See http://jira.jboss.org/jira/browse/JBMESSAGING-1235
+ //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ if (po.isClustered())
+ {
+ if (destination.isClustered() != queue.isClustered())
+ {
+
+ log.warn("Queue " + destination.getName()
+ + " previous clustered attribute is " + queue.isClustered()
+ + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+
+ queue = po.convertDestination(destination);
+ }
+ }
- boolean actuallyClustered = po.isClustered() && destination.isClustered();
-
- if (actuallyClustered != queue.isClustered())
- {
- throw new IllegalArgumentException("Queue " + destination.getName() + " is already deployed as clustered = " +
- queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
- " . You must delete the destination first before redeploying");
-
- }
-
queue.setPagingParams(destination.getFullSize(),
- destination.getPageSize(),
- destination.getDownCacheSize());
+ destination.getPageSize(),
+ destination.getDownCacheSize());
queue.load();
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/jms/server/destination/TopicService.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -75,15 +75,17 @@
{
Queue queue = (Queue)iter.next();
- boolean actuallyClustered = po.isClustered() && destination.isClustered();
-
- if (actuallyClustered != queue.isClustered())
- {
- throw new IllegalArgumentException("Topic " + destination.getName() + " is already deployed as clustered = " +
- queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
- " . You must delete the destination first before redeploying");
-
- }
+ //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ if (po.isClustered())
+ {
+ if (destination.isClustered() != queue.isClustered())
+ {
+ log.warn("Topic " + destination.getName()
+ + " previous clustered attribute is " + queue.isClustered()
+ + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+ queue = po.convertDestination(destination);
+ }
+ }
//TODO We need to set the paging params this way since the post office doesn't store them
//instead we should never create queues inside the postoffice - only do it at deploy time
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/PostOffice.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
+import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.messaging.core.impl.tx.Transaction;
/**
@@ -155,5 +156,10 @@
Map getRecoveryArea(String queueName);
int getRecoveryMapSize(String queueName);
+
+ /**
+ * change the destination's clustered state.
+ */
+ Queue convertDestination(ManagedDestination destination) throws Throwable;
}
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/contract/Queue.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -94,4 +94,6 @@
//Optimisation for shared database
Delivery handleMove(MessageReference ref, long sourceChannelID);
+
+ void setClustered(boolean isClustered);
}
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -2668,6 +2668,10 @@
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("DELETE_MESSAGE",
"DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+ map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
+ map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE CHANNEL_ID=?");
+
+
// Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -708,4 +708,9 @@
}
}
}
+
+ public void setClustered(boolean isClustered)
+ {
+ clustered = isClustered;
+ }
}
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -49,7 +49,9 @@
import javax.transaction.TransactionManager;
import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.server.JMSCondition;
import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
@@ -484,7 +486,7 @@
return added;
}
-
+
public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
{
Binding binding = internalRemoveBinding(queueName, allNodes, true);
@@ -755,7 +757,157 @@
}
}
- public void injectServerPeer(ServerPeer serverPeer)
+ /*
+ * Convert an existing destination:
+ * if it is clustered, change it to be non-clustered.
+ * if it is non-clustered, change it to be clustered.
+ *
+ * Note: this method should be called during the destination loading time, i.e.
+ * the queue is not activated yet. Only applicable to clustered post office.
+ */
+ public Queue convertDestination(ManagedDestination newDest) throws Throwable
+ {
+ Binding b = getBindingForQueueName(newDest.getName());
+ if (b.queue.isActive())
+ {
+ throw new IllegalStateException(this + " cannot convert the destination " + b.queue + " because it is in active state");
+ }
+
+ //if a destination changes from clustered to standalone, collect the messages from all channels
+ if ( !newDest.isClustered() )
+ {
+ Collection allBindings = getAllBindingsForQueueName(newDest.getName());
+ if (newDest.isDropOldMessageOnRedeploy())
+ {
+ removeDBChannelMessages(allBindings);
+ }
+ else
+ {
+ mergeDBChannelMessages(allBindings, b.queue.getChannelID());
+ }
+ }
+
+ //if the old destination is clustered, we need to remove from all nodes (queue and durable subs)
+ b = removeBinding(newDest.getName(), !newDest.isClustered());
+
+ b.queue.setClustered(newDest.isClustered());
+
+ if (newDest.isQueue())
+ {
+ JMSCondition queueCond = new JMSCondition(true, newDest.getName());
+ addBinding(new Binding(queueCond, b.queue, false), false);
+ }
+ else
+ {
+ b.queue.setClustered(newDest.isClustered());
+
+ JMSCondition queueCond = new JMSCondition(false, newDest.getName());
+ addBinding(new Binding(queueCond, b.queue, newDest.isClustered()), newDest.isClustered());
+ }
+
+ return b.queue;
+ }
+
+ private void removeDBChannelMessages(final Collection allBindings) throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+ class RemoveChannelMessages extends JDBCTxRunner<Object>
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps1 = null, ps2 = null;
+
+ try
+ {
+ ps1 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+ ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+
+ Iterator itBindings = allBindings.iterator();
+
+ while (itBindings.hasNext())
+ {
+ Binding bd = (Binding)itBindings.next();
+ long channelID = bd.queue.getChannelID();
+
+ ps1.setLong(1, channelID);
+
+ int rows1 = ps1.executeUpdate();
+
+ ps2.setLong(1, channelID);
+
+ int rows2 = ps2.executeUpdate();
+
+ if (trace)
+ {
+ log.trace(rows1 + " rows deleted from channel " + channelID);
+ }
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(ps1);
+ closeStatement(ps2);
+ }
+ }
+ }
+
+ new RemoveChannelMessages().executeWithRetry();
+
+ }
+
+ private void mergeDBChannelMessages(final Collection allBindings, final long toChannelID) throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+ class MergeChannelMessages extends JDBCTxRunner<Object>
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+ Iterator itBindings = allBindings.iterator();
+
+ ps.setLong(1, toChannelID);
+
+ while (itBindings.hasNext())
+ {
+ Binding bd = (Binding)itBindings.next();
+ long fromChannelID = bd.queue.getChannelID();
+
+ ps.setLong(2, fromChannelID);
+
+ int rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace(rows + " rows updated from channel " + fromChannelID + " to channel: " + toChannelID);
+ }
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(ps);
+ }
+ }
+ }
+
+ new MergeChannelMessages().executeWithRetry();
+ }
+
+ public void injectServerPeer(ServerPeer serverPeer)
{
this.serverPeer = serverPeer;
}
@@ -2620,7 +2772,6 @@
groupMember.unicastData(request, address);
}
-
private Map getBindingsFromStorage() throws Exception
{
if (ds == null)
Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2009-10-21 09:07:29 UTC (rev 7859)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2009-10-21 10:01:54 UTC (rev 7860)
@@ -392,6 +392,10 @@
{
return null;
}
+
+ public void setClustered(boolean isClustered)
+ {
+ }
}
// Inner classes -------------------------------------------------
More information about the jboss-cvs-commits
mailing list