[jboss-cvs] JBoss Messaging SVN: r1549 - in branches/Branch_Client_Failover_Experiment/src: etc/server/default/deploy main/org/jboss/messaging/core/plugin/postoffice main/org/jboss/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 2 19:21:29 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-02 19:21:22 -0500 (Thu, 02 Nov 2006)
New Revision: 1549
Modified:
branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 - Implementing field failed into Binding, BindingInfo and clustered related operations
Modified: branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-11-03 00:21:22 UTC (rev 1549)
@@ -67,10 +67,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, IS_FAILED_OVER VARCHAR(1))
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER) VALUES (?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
<attribute name="GroupName">Queue</attribute>
<attribute name="StateTimeout">5000</attribute>
@@ -131,10 +131,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, IS_FAILED_OVER VARCHAR(1))
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER) VALUES (?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, IS_FAILED_OVER FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
<attribute name="GroupName">Topic</attribute>
<attribute name="StateTimeout">5000</attribute>
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -39,4 +39,9 @@
public String getCondition();
public Queue getQueue();
+
+ public boolean isFailed();
+
+ public void setFailed(boolean failed);
+
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultBinding.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -40,35 +40,49 @@
private String condition;
private Queue queue;
-
+
+ private boolean failed;
+
public DefaultBinding()
- {
+ {
}
- public DefaultBinding(int nodeId, String condition, Queue queue)
+ public DefaultBinding(int nodeId, String condition, Queue queue, boolean failed)
{
this.nodeId = nodeId;
-
- this.condition = condition;
-
+
+ this.condition = condition;
+
this.queue = queue;
+
+ this.failed = failed;
}
-
+
public int getNodeId()
{
return nodeId;
}
-
+
public String getCondition()
{
return condition;
}
-
+
public Queue getQueue()
{
return queue;
}
+ public boolean isFailed()
+ {
+ return failed;
+ }
+
+ public void setFailed(boolean failed)
+ {
+ this.failed = failed;
+ }
+
public String toString()
{
return "Node" + nodeId + " condition=" + condition + " queue=" + queue + " queueClass=" + queue.getClass().getName();
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -188,7 +188,7 @@
throw new IllegalArgumentException("Binding already exists for name " + queue.getName());
}
- binding = new DefaultBinding(nodeId, condition, queue);
+ binding = new DefaultBinding(nodeId, condition, queue, false);
addBinding(binding);
@@ -497,9 +497,11 @@
long channelId = rs.getLong(5);
+ boolean failed = rs.getString(6).equals("Y");
+
log.info("PostOffice " + this.officeName + " nodeId=" + nodeId + " condition=" + condition + " queueName=" + queueName + " channelId=" + channelId + " selector=" + selector);
- Binding binding = this.createBinding(nodeId, condition, queueName, channelId, selector, true);
+ Binding binding = this.createBinding(nodeId, condition, queueName, channelId, selector, true, failed);
binding.getQueue().deactivate();
addBinding(binding);
@@ -525,15 +527,15 @@
}
}
- protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, String filterString, boolean durable) throws Exception
+ protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, String filterString, boolean durable, boolean failed) throws Exception
{
Filter filter = filterFactory.createFilter(filterString);
- return createBinding(nodeId, condition, queueName, channelId, filter, durable);
+ return createBinding(nodeId, condition, queueName, channelId, filter, durable, failed);
}
- protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable)
+ protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
{
Queue queue;
if (nodeId == this.nodeId)
@@ -548,7 +550,7 @@
throw new IllegalStateException("This is a non clustered post office - should not have bindings from different nodes!");
}
- Binding binding = new DefaultBinding(nodeId, condition, queue);
+ Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
return binding;
@@ -581,6 +583,7 @@
ps.setNull(5, Types.VARCHAR);
}
ps.setLong(6, binding.getQueue().getChannelID());
+ ps.setString(7,binding.isFailed()?"Y":"N");
ps.executeUpdate();
}
@@ -820,12 +823,12 @@
{
Map map = new LinkedHashMap();
map.put("INSERT_BINDING",
- "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) " +
- "VALUES (?, ?, ?, ?, ?, ?)");
+ "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID, IS_FAILED_OVER) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?)");
map.put("DELETE_BINDING",
"DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?");
map.put("LOAD_BINDINGS",
- "SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE " +
+ "SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID, IS_FAILED_OVER FROM JMS_POSTOFFICE " +
"WHERE POSTOFFICE_NAME = ?");
return map;
}
@@ -836,7 +839,7 @@
map.put("CREATE_POSTOFFICE_TABLE",
"CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID INTEGER," +
"QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), " +
- "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)");
+ "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, IS_FAILED_OVER VARCHAR(1))");
return map;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -44,16 +44,17 @@
}
BindRequest(int nodeId, String queueName, String condition, String filterString,
- long channelId, boolean durable)
+ long channelId, boolean durable, boolean failed)
{
bindingInfo = new BindingInfo(nodeId, queueName, condition, filterString,
- channelId, durable);
+ channelId, durable, failed);
}
Object execute(PostOfficeInternal office) throws Exception
{
office.addBindingFromCluster(bindingInfo.getNodeId(), bindingInfo.getQueueName(), bindingInfo.getCondition(),
- bindingInfo.getFilterString(), bindingInfo.getChannelId(), bindingInfo.isDurable());
+ bindingInfo.getFilterString(), bindingInfo.getChannelId(), bindingInfo.isDurable(),
+ bindingInfo.isFailed());
return null;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindingInfo.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -49,13 +49,15 @@
private long channelId;
private boolean durable;
+
+ private boolean failed;
BindingInfo()
{
}
BindingInfo(int nodeId, String queueName, String condition, String filterString,
- long channelId, boolean durable)
+ long channelId, boolean durable, boolean failed)
{
this.nodeId = nodeId;
@@ -68,12 +70,14 @@
this.channelId = channelId;
this.durable = durable;
+
+ this.failed = failed;
}
public void execute(PostOfficeInternal office) throws Exception
{
office.addBindingFromCluster(nodeId, queueName, condition,
- filterString, channelId, durable);
+ filterString, channelId, durable, failed);
}
@@ -136,4 +140,14 @@
{
return queueName;
}
+
+ public boolean isFailed()
+ {
+ return failed;
+ }
+
+ public void setFailed(boolean failed)
+ {
+ this.failed = failed;
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -38,5 +38,5 @@
{
List getQueues();
- ClusteredQueue[] getLocalQueue();
+ ClusteredQueue getLocalQueue();
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -332,31 +332,37 @@
}
Binding binding = (Binding)super.bindQueue(condition, queue);
-
- BindRequest request =
- new BindRequest(this.nodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
- binding.getQueue().getChannelID(), queue.isRecoverable());
-
- syncSendRequest(request);
-
+
+ sendBindRequest(condition, queue, binding);
+
return binding;
}
-
- public Binding unbindClusteredQueue(String queueName) throws Throwable
+
+ private void sendBindRequest(String condition, LocalClusteredQueue queue, Binding binding)
+ throws Exception
{
- if (trace)
- {
- log.trace(this.nodeId + " unbind clustered queue: " + queueName);
- }
-
- Binding binding = (Binding)super.unbindQueue(queueName);
-
- UnbindRequest request = new UnbindRequest(this.nodeId, queueName);
-
+ BindRequest request =
+ new BindRequest(this.nodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
+ binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
+
syncSendRequest(request);
-
- return binding;
}
+
+ public Binding unbindClusteredQueue(String queueName) throws Throwable
+ {
+ if (trace)
+ {
+ log.trace(this.nodeId + " unbind clustered queue: " + queueName);
+ }
+
+ Binding binding = (Binding)super.unbindQueue(queueName);
+
+ UnbindRequest request = new UnbindRequest(this.nodeId, queueName);
+
+ syncSendRequest(request);
+
+ return binding;
+ }
public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
{
@@ -547,7 +553,7 @@
* Called when another node adds a binding
*/
public void addBindingFromCluster(int nodeId, String queueName, String condition,
- String filterString, long channelID, boolean durable)
+ String filterString, long channelID, boolean durable, boolean failed)
throws Exception
{
lock.writeLock().acquire();
@@ -581,7 +587,7 @@
throw new IllegalArgumentException(this.nodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
}
- binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable);
+ binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable, failed);
addBinding(binding);
}
@@ -974,12 +980,10 @@
//Maybe the local queue now wants to pull message(s) from the remote queue given that the
//stats for the remote queue have changed
- LocalClusteredQueue localQueueArray[] = (LocalClusteredQueue[])router.getLocalQueue();
+ LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
- // TODO: Verify what to do with this array of local queues, since we used to have only one localQueue
- if (localQueueArray.length>0)
+ if (localQueue!=null)
{
- LocalClusteredQueue localQueue = localQueueArray[0];
//TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
@@ -1095,47 +1099,59 @@
public void failOver(int nodeId) throws Exception
{
- log.info("Preparing failover against node " + nodeId);
- Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
- ArrayList namesToRemove = new ArrayList();
- for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterNames.next();
- Binding binding = (Binding )entry.getValue();
- if (binding.getQueue().isClustered())
- {
- ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
- if (!queue.isLocal())
- {
- namesToRemove.add(entry);
- }
- }
- }
+ log.info("Preparing failover against node " + nodeId);
+ Map subMaps = (Map)nameMaps.get(new Integer(nodeId));
+ ArrayList namesToRemove = new ArrayList();
+ for (Iterator iterNames = subMaps.entrySet().iterator();iterNames.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)iterNames.next();
+ Binding binding = (Binding )entry.getValue();
+ if (binding.getQueue().isClustered())
+ {
+ ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
+ if (!queue.isLocal())
+ {
+ namesToRemove.add(entry);
+ }
+ }
+ }
- for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterNames.next();
- Binding binding = (Binding)entry.getValue();
- RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
- this.removeBinding(nodeId,(String)entry.getKey());
+ for (Iterator iterNames = namesToRemove.iterator();iterNames.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)iterNames.next();
+ Binding binding = (Binding)entry.getValue();
+ RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+ this.removeBinding(nodeId,(String)entry.getKey());
- this.deleteBinding(nodeId,(String)entry.getKey());
+ this.deleteBinding(nodeId,(String)entry.getKey());
- // todo: remove Binding from cluster
+ UnbindRequest unbindRequest = new UnbindRequest(nodeId, stub.getName());
+ syncSendRequest(unbindRequest);
- Binding newBinding = this.createBinding(this.nodeId,binding.getCondition(),
- stub.getName(),stub.getChannelID(),
- stub.getFilter(),stub.isRecoverable());
+ // A failed over queue will have the flag failover, only if there isn't another local queue with the same name
+ // In case this node doesn't have that queue, we will simply assume the queue as nothing else had happened.
+ boolean failed = this.getBindingForQueueName(stub.getName())!=null;
- insertBinding(newBinding);
+ if (!failed)
+ {
+ log.info("The current node didn't have a queue " + stub.getName() + " so it's assuming the queue as a regular queue");
+ }
- LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
- clusteredQueue.deactivate();
- clusteredQueue.load();
- clusteredQueue.activate();
- addBinding(newBinding);
- // todo: send new Binding into Cluster
- }
+
+ Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
+ stub.getName(), stub.getChannelID(),
+ stub.getFilter(), stub.isRecoverable(), failed);
+
+ insertBinding(newBinding);
+
+ LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+ clusteredQueue.deactivate();
+ clusteredQueue.load();
+ clusteredQueue.activate();
+ addBinding(newBinding);
+ System.out.println("**** sending binding on " + binding.getQueue().getName() + " with condition=" + binding.getCondition());
+ sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+ }
}
@@ -1282,14 +1298,7 @@
}
}
- protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, String filterString, boolean durable) throws Exception
- {
- Filter filter = filterFactory.createFilter(filterString);
-
- return createBinding(nodeId, condition, queueName, channelId, filter, durable);
- }
-
- protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable)
+ protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
{
Queue queue;
if (nodeId == this.nodeId)
@@ -1304,7 +1313,7 @@
queue = new RemoteQueueStub(nodeId, queueName, channelId, durable, pm, filter);
}
- Binding binding = new DefaultBinding(nodeId, condition, queue);
+ Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
return binding;
}
@@ -1424,7 +1433,8 @@
binding.getCondition(),
queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
queue.getChannelID(),
- queue.isRecoverable());
+ queue.isRecoverable(),
+ binding.isFailed());
bindings.add(info);
}
}
@@ -1458,7 +1468,8 @@
{
BindingInfo info = (BindingInfo)iter.next();
- Binding binding = this.createBinding(info.getNodeId(), info.getCondition(), info.getQueueName(), info.getChannelId(), info.getFilterString(), info.isDurable());
+ Binding binding = this.createBinding(info.getNodeId(), info.getCondition(), info.getQueueName(), info.getChannelId(),
+ info.getFilterString(), info.isDurable(),info.isFailed());
if (binding.getNodeId() == this.nodeId)
{
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -55,88 +55,81 @@
//MUST be an arraylist for fast index access
private ArrayList nonLocalQueues;
- private ArrayList localQueues;
+ private ClusteredQueue localQueue;
private int target;
-
+
public DefaultRouter()
{
nonLocalQueues = new ArrayList();
- localQueues= new ArrayList();
}
-
+
public int size()
{
- return nonLocalQueues.size() + localQueues.size();
+ return nonLocalQueues.size() + (localQueue == null ? 0 : 1);
}
-
- public ClusteredQueue[] getLocalQueue()
+
+ public ClusteredQueue getLocalQueue()
{
- return (ClusteredQueue[])localQueues.toArray(new LocalClusteredQueue[localQueues.size()]);
+ return localQueue;
}
public boolean add(Receiver receiver)
{
ClusteredQueue queue = (ClusteredQueue)receiver;
-
+
if (queue.isLocal())
{
- /**if (localQueue != null)
+ if (localQueue != null)
{
//throw new IllegalStateException("Already has local queue");
- log.warn("Already has LocalQueue " + localQueue);
- } */
- localQueues.add(queue);
+ return true; // todo - fix this
+ }
+ localQueue = queue;
}
else
{
- nonLocalQueues.add(queue);
+ nonLocalQueues.add(queue);
}
-
+
return true;
}
public void clear()
{
nonLocalQueues.clear();
-
- localQueues.clear();
-
+
+ localQueue = null;
+
target = 0;
}
public boolean contains(Receiver queue)
{
- return localQueues.contains(queue) || nonLocalQueues.contains(queue);
+ return localQueue == queue || nonLocalQueues.contains(queue);
}
public Iterator iterator()
{
List queues = new ArrayList();
-
- queues.addAll(localQueues);
+ if (localQueue != null)
+ {
+ queues.add(localQueue);
+ }
+
queues.addAll(nonLocalQueues);
-
+
return queues.iterator();
}
public boolean remove(Receiver queue)
- {
- if (localQueues.contains(queue))
+ {
+ if (localQueue == queue)
{
- if (localQueues.remove(queue))
- {
- if (target >= localQueues.size() - 1)
- {
- target = localQueues.size() - 1;
- }
- return true;
- }
- else
- {
- return false;
- }
+ localQueue = null;
+
+ return true;
}
else
{
@@ -152,95 +145,81 @@
{
return false;
}
- }
+ }
}
public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
{
if (trace) { log.trace(this + " routing ref " + reference); }
-
+
//Favour the local queue
-
- if (!localQueues.isEmpty())
+
+ if (localQueue != null)
{
- checkTargetLocal();
- ClusteredQueue queue = (ClusteredQueue)localQueues.get(target);
+ //The only time the local queue won't accept is if the selector doesn't
+ //match - in which case it won't match at any other nodes too so no point
+ //in trying them
- queue = (ClusteredQueue)localQueues.get(target);
+ Delivery del = localQueue.handle(observer, reference, tx);
- Delivery del = queue.handle(observer, reference, tx);
+ if (trace) { log.trace(this + " routed to local queue, it returned " + del); }
- if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
-
- incTargetLocal();
-
- //Again, if the selector doesn't match then it won't on any others so no point trying them
- return del;
+ return del;
}
else
{
- //There is no local shared queue
+ //There is no local shared queue
//We round robin among the rest
-
+
if (!nonLocalQueues.isEmpty())
{
ClusteredQueue queue = (ClusteredQueue)nonLocalQueues.get(target);
-
+
queue = (ClusteredQueue)nonLocalQueues.get(target);
-
+
Delivery del = queue.handle(observer, reference, tx);
-
+
if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
-
+
incTarget();
//Again, if the selector doesn't match then it won't on any others so no point trying them
return del;
- }
+ }
}
-
+
if (trace) { log.trace(this + " no queues to route to so return null"); }
-
+
return null;
}
-
- private void incTarget()
- {
- target++;
- if (target == nonLocalQueues.size())
- {
- target = 0;
- }
- }
+ private void incTarget()
+ {
+ target++;
- private void incTargetLocal()
- {
- target++;
+ if (target == nonLocalQueues.size())
+ {
+ target = 0;
+ }
+ }
- checkTargetLocal();
- }
+ public List getQueues()
+ {
+ List queues = new ArrayList();
- private void checkTargetLocal() {
- if (target == localQueues.size())
- {
- target = 0;
- }
- }
+ if (localQueue != null)
+ {
+ queues.add(localQueue);
+ }
- public List getQueues()
- {
- List queues = new ArrayList();
+ queues.addAll(nonLocalQueues);
- queues.addAll(localQueues);
- queues.addAll(nonLocalQueues);
+ return queues;
+ }
- return queues;
- }
-
public int numberOfReceivers()
{
- return nonLocalQueues.size() + localQueues.size();
+ return nonLocalQueues.size() + (localQueue != null ? 1 : 0);
}
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -43,7 +43,7 @@
interface PostOfficeInternal extends ClusteredPostOffice
{
void addBindingFromCluster(int nodeId, String queueName, String condition,
- String filterString, long channelId, boolean durable)
+ String filterString, long channelId, boolean durable, boolean failed)
throws Exception;
void removeBindingFromCluster(int nodeId, String queueName)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -263,5 +263,9 @@
{
return true;
}
-
+
+ public String toString()
+ {
+ return "RemoteQueueStub(node=" + this.nodeId + " name=" + this.name + " channelId=" + this.id + ")";
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java 2006-11-02 20:20:07 UTC (rev 1548)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java 2006-11-03 00:21:22 UTC (rev 1549)
@@ -54,6 +54,8 @@
Object execute(PostOfficeInternal office) throws Exception
{
+ // TODO: Due to failoever we can't guarantee unique key by nodeId and queueName any more.
+ // We might need to revist this
office.removeBindingFromCluster(nodeId, queueName);
return null;
More information about the jboss-cvs-commits
mailing list