[jboss-cvs] JBoss Messaging SVN: r1550 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/server/endpoint messaging/core/plugin/contract messaging/core/plugin/postoffice messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 2 20:05:29 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-11-02 20:05:25 -0500 (Thu, 02 Nov 2006)
New Revision: 1550
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-519 - Implementing field failedOver HashMap
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-03 01:05:25 UTC (rev 1550)
@@ -217,7 +217,8 @@
{
postOfficeToUse = queuePostOffice;
}
- binding = postOfficeToUse.getBindingforChannelId(-1,oldchannelID);
+ // this is a Clustered operation... so postOffice here must be Clustered
+ binding = ((ClusteredPostOffice)postOfficeToUse).getBindingforChannelId(oldchannelID);
if (binding==null)
{
throw new JMSException("Can't find failed over channel " + oldchannelID);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-11-03 01:05:25 UTC (rev 1550)
@@ -59,4 +59,6 @@
Collection listAllBindingsForCondition(String condition) throws Exception;
public void failOver(int nodeId) throws Exception;
+
+ Binding getBindingforChannelId(long channelId) throws Exception;
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2006-11-03 01:05:25 UTC (rev 1550)
@@ -70,8 +70,6 @@
*/
Binding getBindingForQueueName(String queueName) throws Exception;
- Binding getBindingforChannelId(int parameterNodeId, long channelId) throws Exception;
-
/**
* Route a reference.
* @param ref
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-11-03 01:05:25 UTC (rev 1550)
@@ -280,47 +280,6 @@
}
}
- public Binding getBindingforChannelId(int parameterNodeId, long channelId) throws Exception
- {
- if (parameterNodeId<0) parameterNodeId=this.nodeId;
- log.info("DefaultPostOffice::getBindingforChannelId(" + parameterNodeId + " ," + channelId + ")");
-
- lock.readLock().acquire();
-
- try
- {
- Map nameMap = (Map)nameMaps.get(new Integer(parameterNodeId));
-
- Binding binding = null;
-
- if (nameMap != null)
- {
- for (Iterator iterbindings = nameMap.values().iterator();iterbindings.hasNext();)
- {
- Binding itemBinding = (Binding)iterbindings.next();
- if (itemBinding.getQueue().getChannelID()==channelId)
- {
- binding=itemBinding;
- break;
- }
- }
-
- }
- else
- {
- log.info("nameMap is null");
- }
-
-
- log.info("Returned " + binding);
- return binding;
- }
- finally
- {
- lock.readLock().release();
- }
- }
-
public void recover() throws Exception
{
//NOOP
@@ -736,15 +695,15 @@
protected void addToNameMap(Binding binding)
{
Map nameMap = (Map)nameMaps.get(new Integer(binding.getNodeId()));
-
+
if (nameMap == null)
{
nameMap = new LinkedHashMap();
-
+
nameMaps.put(new Integer(binding.getNodeId()), nameMap);
}
-
- nameMap.put(binding.getQueue().getName(), binding);
+
+ nameMap.put(binding.getQueue().getName(), binding);
}
protected void addToConditionMap(Binding binding)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java 2006-11-03 01:05:25 UTC (rev 1550)
@@ -24,6 +24,7 @@
import java.util.List;
import org.jboss.messaging.core.Router;
+import org.jboss.messaging.core.Receiver;
/**
* A ClusterRouter
@@ -37,6 +38,9 @@
public interface ClusterRouter extends Router
{
List getQueues();
-
+
ClusteredQueue getLocalQueue();
+
+ boolean add(Receiver receiver, boolean failedOver);
+
}
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-11-03 01:05:25 UTC (rev 1550)
@@ -22,13 +22,7 @@
package org.jboss.messaging.core.plugin.postoffice.cluster;
import java.io.*;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
@@ -129,7 +123,11 @@
private ClusterRouterFactory routerFactory;
private Map routerMap;
-
+
+ /** List of failed over bindings.
+ * Map<int nodeId, Map<channelId,Binding>>*/
+ private Map failedBindings;
+
private StatsSender statsSender;
private boolean started;
@@ -227,6 +225,8 @@
this.routerFactory = rf;
routerMap = new HashMap();
+
+ failedBindings = new LinkedHashMap();
statsSender = new StatsSender(this, statsSendPeriod);
@@ -582,7 +582,7 @@
binding = (Binding)nameMap.get(queueName);
}
- if (binding != null)
+ if (binding != null && failed)
{
throw new IllegalArgumentException(this.nodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
}
@@ -1154,8 +1154,56 @@
}
}
+ public Binding getBindingforChannelId(long channelId) throws Exception
+ {
+ lock.readLock().acquire();
+ try
+ {
+ Map channelMap = (Map)failedBindings.get(new Integer(nodeId));
+ Binding binding = null;
+ if (channelMap!=null)
+ {
+ binding = (Binding)channelMap.get(new Long(channelId));
+ }
+
+ if (binding==null)
+ {
+
+ Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+
+
+ if (nameMap != null)
+ {
+ for (Iterator iterbindings = nameMap.values().iterator();iterbindings.hasNext();)
+ {
+ Binding itemBinding = (Binding)iterbindings.next();
+ if (itemBinding.getQueue().getChannelID()==channelId)
+ {
+ binding=itemBinding;
+ break;
+ }
+ }
+
+ }
+ else
+ {
+ log.info("nameMap is null");
+ }
+
+ }
+ log.info("Returned " + binding);
+ return binding;
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
+ }
+
+
+
public String printBindingInformation()
{
@@ -1190,36 +1238,65 @@
// Protected ---------------------------------------------------------------------------------------
-
+
+
+ protected void addToNameMap(Binding binding)
+ {
+ if (!binding.isFailed())
+ {
+ super.addToNameMap(binding);
+ }
+ else
+ {
+ addIntoFailedMaps(binding);
+ }
+ }
+
+ private void addIntoFailedMaps(Binding binding)
+ {
+ Map channelMap = (Map)failedBindings.get(new Integer(binding.getNodeId()));
+
+ if (channelMap == null)
+ {
+ channelMap = new LinkedHashMap();
+
+ failedBindings.put(new Integer(binding.getNodeId()), failedBindings);
+ }
+
+ channelMap.put(new Long(binding.getQueue().getChannelID()), binding);
+ }
+
+
protected void addToConditionMap(Binding binding)
{
String condition = binding.getCondition();
-
+
ClusteredBindings bindings = (ClusteredBindings)conditionMap.get(condition);
-
+
if (bindings == null)
{
bindings = new DefaultClusteredBindings(nodeId);
-
+
conditionMap.put(condition, bindings);
}
-
+
bindings.addBinding(binding);
-
+
String queueName = binding.getQueue().getName();
-
+
ClusterRouter router = (ClusterRouter)routerMap.get(queueName);
-
+
if (router == null)
{
router = routerFactory.createRouter();
-
+
routerMap.put(queueName, router);
-
+
bindings.addRouter(queueName, router);
}
-
- router.add(binding.getQueue());
+
+ // todo: Maybe we should have isFailed as a property of Queue instead of Binding, so we won't need to change this signature.
+ router.add(binding.getQueue(),binding.isFailed());
}
protected void removeFromConditionMap(Binding binding)
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-11-03 00:21:22 UTC (rev 1549)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java 2006-11-03 01:05:25 UTC (rev 1550)
@@ -55,6 +55,8 @@
//MUST be an arraylist for fast index access
private ArrayList nonLocalQueues;
+ private ArrayList failedOverQueues;
+
private ClusteredQueue localQueue;
private int target;
@@ -62,6 +64,7 @@
public DefaultRouter()
{
nonLocalQueues = new ArrayList();
+ failedOverQueues = new ArrayList();
}
public int size()
@@ -76,16 +79,27 @@
public boolean add(Receiver receiver)
{
+ return add(receiver,false);
+ }
+
+ public boolean add(Receiver receiver, boolean failedOver)
+ {
ClusteredQueue queue = (ClusteredQueue)receiver;
if (queue.isLocal())
{
- if (localQueue != null)
+ if (failedOver)
{
- //throw new IllegalStateException("Already has local queue");
- return true; // todo - fix this
+ failedOverQueues.add(receiver);
}
- localQueue = queue;
+ else
+ {
+ if (localQueue != null)
+ {
+ throw new IllegalStateException("Already has local queue");
+ }
+ localQueue = queue;
+ }
}
else
{
More information about the jboss-cvs-commits
mailing list