[jboss-cvs] JBoss Messaging SVN: r3295 - in branches/Branch_MC_Integration_New: src/etc/server/default/deploy and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 7 09:03:58 EST 2007
Author: ataylor
Date: 2007-11-07 09:03:57 -0500 (Wed, 07 Nov 2007)
New Revision: 3295
Modified:
branches/Branch_MC_Integration_New/build-messaging.xml
branches/Branch_MC_Integration_New/src/etc/server/default/deploy/jbm-beans.xml
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/Configuration.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/DestinationJNDIMapper.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/destination/DestinationDeployer.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java
branches/Branch_MC_Integration_New/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/Branch_MC_Integration_New/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
Log:
initial microcontainer integration, made configuration non static
Modified: branches/Branch_MC_Integration_New/build-messaging.xml
===================================================================
--- branches/Branch_MC_Integration_New/build-messaging.xml 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/build-messaging.xml 2007-11-07 14:03:57 UTC (rev 3295)
@@ -360,9 +360,6 @@
<mkdir dir="${build.jar}/META-INF"/>
<mkdir dir="${build.jar}/xmdesc"/>
<copy file="${build.etc}/VERSION" toFile="${build.jar}/VERSION"/>
- <copy todir="${build.jar}/xmdesc">
- <fileset dir="${source.etc}/xmdesc" includes="*.xml"/>
- </copy>
<copy todir="${build.jar}" file="${source.etc}/aop-messaging-server.xml"/>
<copy todir="${build.jar}" file="${source.etc}/aop-messaging-client.xml"/>
</target>
Modified: branches/Branch_MC_Integration_New/src/etc/server/default/deploy/jbm-beans.xml
===================================================================
--- branches/Branch_MC_Integration_New/src/etc/server/default/deploy/jbm-beans.xml 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/etc/server/default/deploy/jbm-beans.xml 2007-11-07 14:03:57 UTC (rev 3295)
@@ -27,6 +27,9 @@
<property name="connector">
<inject bean="Remoting"/>
</property>
+ <property name="configuration">
+ <inject bean="Configuration"/>
+ </property>
</bean>
@@ -73,6 +76,9 @@
<property name="clusterNotifier">
<inject bean="ClusterNotifier"/>
</property>
+ <property name="configuration">
+ <inject bean="Configuration"/>
+ </property>
<property name="channelFactory"><inject bean="JChannelFactory"/></property>
<property name="createTablesOnStartup">true</property>
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/Configuration.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/Configuration.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/Configuration.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -40,64 +40,65 @@
private static final String WRITE_ATTR = "write";
private static final String CREATE_ATTR = "create";
private static final String NAME_ATTR = "name";
- private static PropertyChangeSupport propertyChangeSupport;
- private static Integer _serverPeerID = -1;
- private static String _defaultQueueJNDIContext = "";
- private static String _defaultTopicJNDIContext = "";
- private static String _securityDomain;
- private static HashSet<Role> _securityConfig;
- private static String _defaultDLQ;
+
+ private PropertyChangeSupport propertyChangeSupport;
+ private Integer _serverPeerID = -1;
+ private String _defaultQueueJNDIContext = "";
+ private String _defaultTopicJNDIContext = "";
+ private String _securityDomain;
+ private HashSet<Role> _securityConfig;
+ private String _defaultDLQ;
// The default maximum number of delivery attempts before sending to DLQ - can be overridden on
// the destination
- private static Integer _defaultMaxDeliveryAttempts = 10;
- protected static String _defaultExpiryQueue;
+ private Integer _defaultMaxDeliveryAttempts = 10;
+ protected String _defaultExpiryQueue;
- private static Long _defaultRedeliveryDelay = (long) 0;
+ private Long _defaultRedeliveryDelay = (long) 0;
- private static Long _messageCounterSamplePeriod = (long) 10000;// Default is 1 minute
- private static Long _failoverStartTimeout = (long) (60 * 1000);
+ private Long _messageCounterSamplePeriod = (long) 10000;// Default is 1 minute
+ private Long _failoverStartTimeout = (long) (60 * 1000);
// Default is 5 minutes
- private static Long _failoverCompleteTimeout = (long) (5 * 60 * 1000);
+ private Long _failoverCompleteTimeout = (long) (5 * 60 * 1000);
- private static Integer _defaultMessageCounterHistoryDayLimit = 1;
+ private Integer _defaultMessageCounterHistoryDayLimit = 1;
- private static String _clusterPullConnectionFactoryName;
+ private String _clusterPullConnectionFactoryName;
- private static Boolean _useXAForMessagePull = false;
+ private Boolean _useXAForMessagePull = false;
- private static Boolean _defaultPreserveOrdering = false;
+ private Boolean _defaultPreserveOrdering = false;
- private static Long _recoverDeliveriesTimeout = (long) (5 * 60 * 1000);
+ private Long _recoverDeliveriesTimeout = (long) (5 * 60 * 1000);
- private static String _suckerPassword;
+ private String _suckerPassword;
//Global override for strict behaviour
- private static Boolean _strictTck = false;
+ private Boolean _strictTck = false;
//From a system property - this overrides
- private static Boolean _strictTckProperty = false;
+ private Boolean _strictTckProperty = false;
- private static String _postOfficeName;
+ private String _postOfficeName;
- private static Boolean _clustered = false;
+ private Boolean _clustered = false;
- private static Long _stateTimeout = (long) 5000;
+ private Long _stateTimeout = (long) 5000;
- private static Long _castTimeout = (long) 5000;
+ private Long _castTimeout = (long) 5000;
- private static String _groupName;
+ private String _groupName;
- private static String _controlChannelName;
+ private String _controlChannelName;
- private static String _dataChannelName;
+ private String _dataChannelName;
- private static String _channelPartitionName;
+ private String _channelPartitionName;
- private static Integer _maxConcurrentReplications = 25;
+ private Integer _maxConcurrentReplications = 25;
- private static Boolean _useJGroupsWorkaround = false;
+ private Boolean _useJGroupsWorkaround = false;
public void start() throws Exception
{
@@ -160,7 +161,7 @@
}
- private static Boolean getBoolean(Element e, String name, Boolean def)
+ private Boolean getBoolean(Element e, String name, Boolean def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
@@ -171,7 +172,7 @@
}
- private static Integer getInteger(Element e, String name, Integer def)
+ private Integer getInteger(Element e, String name, Integer def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
@@ -181,7 +182,7 @@
return def;
}
- private static Long getLong(Element e, String name, Long def)
+ private Long getLong(Element e, String name, Long def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
@@ -191,7 +192,7 @@
return def;
}
- private static String getString(Element e, String name, String def)
+ private String getString(Element e, String name, String def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
@@ -201,116 +202,116 @@
return def;
}
- public static void addPropertyChangeListener(
+ public void addPropertyChangeListener(
PropertyChangeListener listener)
{
propertyChangeSupport.addPropertyChangeListener(listener);
}
- public static int getServerPeerID()
+ public int getServerPeerID()
{
return _serverPeerID;
}
- public static void setServerPeerID(int serverPeerID)
+ public void setServerPeerID(int serverPeerID)
{
_serverPeerID = serverPeerID;
}
- public static String getDefaultQueueJNDIContext()
+ public String getDefaultQueueJNDIContext()
{
return _defaultQueueJNDIContext;
}
- public static void setDefaultQueueJNDIContext(String defaultQueueJNDIContext)
+ public void setDefaultQueueJNDIContext(String defaultQueueJNDIContext)
{
_defaultQueueJNDIContext = defaultQueueJNDIContext;
}
- public static String getDefaultTopicJNDIContext()
+ public String getDefaultTopicJNDIContext()
{
return _defaultTopicJNDIContext;
}
- public static void setDefaultTopicJNDIContext(String defaultTopicJNDIContext)
+ public void setDefaultTopicJNDIContext(String defaultTopicJNDIContext)
{
_defaultTopicJNDIContext = defaultTopicJNDIContext;
}
- public static void setSecurityDomain(String securityDomain) throws Exception
+ public void setSecurityDomain(String securityDomain) throws Exception
{
_securityDomain = securityDomain;
}
- public static String getSecurityDomain()
+ public String getSecurityDomain()
{
return _securityDomain;
}
- public static HashSet<Role> getSecurityConfig()
+ public HashSet<Role> getSecurityConfig()
{
return _securityConfig;
}
- public static void setSecurityConfig(HashSet<Role> securityConfig)
+ public void setSecurityConfig(HashSet<Role> securityConfig)
{
_securityConfig = securityConfig;
}
- public static String getDefaultDLQ()
+ public String getDefaultDLQ()
{
return _defaultDLQ;
}
- public static void setDefaultDLQ(String defaultDLQ)
+ public void setDefaultDLQ(String defaultDLQ)
{
_defaultDLQ = defaultDLQ;
}
- public static int getDefaultMaxDeliveryAttempts()
+ public int getDefaultMaxDeliveryAttempts()
{
return _defaultMaxDeliveryAttempts;
}
- public static void setDefaultMaxDeliveryAttempts(int defaultMaxDeliveryAttempts)
+ public void setDefaultMaxDeliveryAttempts(int defaultMaxDeliveryAttempts)
{
_defaultMaxDeliveryAttempts = defaultMaxDeliveryAttempts;
}
- public static String getDefaultExpiryQueue()
+ public String getDefaultExpiryQueue()
{
return _defaultExpiryQueue;
}
- public static void setDefaultExpiryQueue(String defaultExpiryQueue)
+ public void setDefaultExpiryQueue(String defaultExpiryQueue)
{
_defaultExpiryQueue = defaultExpiryQueue;
}
- public static long getDefaultRedeliveryDelay()
+ public long getDefaultRedeliveryDelay()
{
return _defaultRedeliveryDelay;
}
- public static void setDefaultRedeliveryDelay(long defaultRedeliveryDelay)
+ public void setDefaultRedeliveryDelay(long defaultRedeliveryDelay)
{
_defaultRedeliveryDelay = defaultRedeliveryDelay;
}
- public static long getMessageCounterSamplePeriod()
+ public long getMessageCounterSamplePeriod()
{
return _messageCounterSamplePeriod;
}
- public static void setMessageCounterSamplePeriod(long messageCounterSamplePeriod)
+ public void setMessageCounterSamplePeriod(long messageCounterSamplePeriod)
{
if (messageCounterSamplePeriod < 1000)
{
@@ -321,34 +322,34 @@
}
- public static long getFailoverStartTimeout()
+ public long getFailoverStartTimeout()
{
return _failoverStartTimeout;
}
- public static void setFailoverStartTimeout(long failoverStartTimeout)
+ public void setFailoverStartTimeout(long failoverStartTimeout)
{
_failoverStartTimeout = failoverStartTimeout;
}
- public static long getFailoverCompleteTimeout()
+ public long getFailoverCompleteTimeout()
{
return _failoverCompleteTimeout;
}
- public static void setFailoverCompleteTimeout(long failoverCompleteTimeout)
+ public void setFailoverCompleteTimeout(long failoverCompleteTimeout)
{
_failoverCompleteTimeout = failoverCompleteTimeout;
}
- public static int getDefaultMessageCounterHistoryDayLimit()
+ public int getDefaultMessageCounterHistoryDayLimit()
{
return _defaultMessageCounterHistoryDayLimit;
}
- public static void setDefaultMessageCounterHistoryDayLimit(int defaultMessageCounterHistoryDayLimit)
+ public void setDefaultMessageCounterHistoryDayLimit(int defaultMessageCounterHistoryDayLimit)
{
if (defaultMessageCounterHistoryDayLimit < -1)
{
@@ -358,177 +359,177 @@
}
- public static String getClusterPullConnectionFactoryName()
+ public String getClusterPullConnectionFactoryName()
{
return _clusterPullConnectionFactoryName;
}
- public static void setClusterPullConnectionFactoryName(String clusterPullConnectionFactoryName)
+ public void setClusterPullConnectionFactoryName(String clusterPullConnectionFactoryName)
{
_clusterPullConnectionFactoryName = clusterPullConnectionFactoryName;
}
- public static boolean isUseXAForMessagePull()
+ public boolean isUseXAForMessagePull()
{
return _useXAForMessagePull;
}
- public static void setUseXAForMessagePull(boolean useXAForMessagePull)
+ public void setUseXAForMessagePull(boolean useXAForMessagePull)
{
_useXAForMessagePull = useXAForMessagePull;
}
- public static boolean isDefaultPreserveOrdering()
+ public boolean isDefaultPreserveOrdering()
{
return _defaultPreserveOrdering;
}
- public static void setDefaultPreserveOrdering(boolean defaultPreserveOrdering)
+ public void setDefaultPreserveOrdering(boolean defaultPreserveOrdering)
{
_defaultPreserveOrdering = defaultPreserveOrdering;
}
- public static long getRecoverDeliveriesTimeout()
+ public long getRecoverDeliveriesTimeout()
{
return _recoverDeliveriesTimeout;
}
- public static void setRecoverDeliveriesTimeout(long recoverDeliveriesTimeout)
+ public void setRecoverDeliveriesTimeout(long recoverDeliveriesTimeout)
{
_recoverDeliveriesTimeout = recoverDeliveriesTimeout;
}
- public static String getSuckerPassword()
+ public String getSuckerPassword()
{
return _suckerPassword;
}
- public static void setSuckerPassword(String suckerPassword)
+ public void setSuckerPassword(String suckerPassword)
{
_suckerPassword = suckerPassword;
}
- public static boolean isStrictTck()
+ public boolean isStrictTck()
{
return _strictTck || _strictTckProperty;
}
- public static void setStrictTck(boolean strictTck)
+ public void setStrictTck(boolean strictTck)
{
_strictTck = strictTck || _strictTckProperty;
}
- public static String getPostOfficeName()
+ public String getPostOfficeName()
{
return _postOfficeName;
}
- public static void setPostOfficeName(String postOfficeName)
+ public void setPostOfficeName(String postOfficeName)
{
_postOfficeName = postOfficeName;
}
- public static boolean isClustered()
+ public boolean isClustered()
{
return _clustered;
}
- public static void setClustered(boolean clustered)
+ public void setClustered(boolean clustered)
{
_clustered = clustered;
}
- public static long getStateTimeout()
+ public long getStateTimeout()
{
return _stateTimeout;
}
- public static void setStateTimeout(long stateTimeout)
+ public void setStateTimeout(long stateTimeout)
{
_stateTimeout = stateTimeout;
}
- public static long getCastTimeout()
+ public long getCastTimeout()
{
return _castTimeout;
}
- public static void setCastTimeout(long castTimeout)
+ public void setCastTimeout(long castTimeout)
{
_castTimeout = castTimeout;
}
- public static String getGroupName()
+ public String getGroupName()
{
return _groupName;
}
- public static void setGroupName(String groupName)
+ public void setGroupName(String groupName)
{
_groupName = groupName;
}
- public static String getControlChannelName()
+ public String getControlChannelName()
{
return _controlChannelName;
}
- public static void setControlChannelName(String controlChannelName)
+ public void setControlChannelName(String controlChannelName)
{
_controlChannelName = controlChannelName;
}
- public static String getDataChannelName()
+ public String getDataChannelName()
{
return _dataChannelName;
}
- public static void setDataChannelName(String dataChannelName)
+ public void setDataChannelName(String dataChannelName)
{
_dataChannelName = dataChannelName;
}
- public static String getChannelPartitionName()
+ public String getChannelPartitionName()
{
return _channelPartitionName;
}
- public static void setChannelPartitionName(String channelPartitionName)
+ public void setChannelPartitionName(String channelPartitionName)
{
_channelPartitionName = channelPartitionName;
}
- public static int getMaxConcurrentReplications()
+ public int getMaxConcurrentReplications()
{
return _maxConcurrentReplications;
}
- public static void setMaxConcurrentReplications(int maxConcurrentReplications)
+ public void setMaxConcurrentReplications(int maxConcurrentReplications)
{
_maxConcurrentReplications = maxConcurrentReplications;
}
- public static boolean isUseJGroupsWorkaround()
+ public boolean isUseJGroupsWorkaround()
{
return _useJGroupsWorkaround;
}
- public static void setUseJGroupsWorkaround(boolean useJGroupsWorkaround)
+ public void setUseJGroupsWorkaround(boolean useJGroupsWorkaround)
{
_useJGroupsWorkaround = useJGroupsWorkaround;
}
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/DestinationJNDIMapper.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/DestinationJNDIMapper.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/DestinationJNDIMapper.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -85,7 +85,7 @@
if (jndiName == null)
{
parentContext = destination.isQueue() ?
- Configuration.getDefaultQueueJNDIContext() : Configuration.getDefaultTopicJNDIContext();
+ serverPeer.getConfiguration().getDefaultQueueJNDIContext() : serverPeer.getConfiguration().getDefaultTopicJNDIContext();
jndiNameInContext = destination.getName();
jndiName = parentContext + "/" + jndiNameInContext;
@@ -231,8 +231,8 @@
initialContext = new InitialContext();
// see if the default queue/topic contexts are there, and if they're not, create them
- createContext(Configuration.getDefaultQueueJNDIContext());
- createContext(Configuration.getDefaultTopicJNDIContext());
+ createContext(serverPeer.getConfiguration().getDefaultQueueJNDIContext());
+ createContext(serverPeer.getConfiguration().getDefaultTopicJNDIContext());
}
public void stop() throws Exception
@@ -252,8 +252,8 @@
unregisterDestination((ManagedDestination)i.next());
}
- initialContext.unbind(Configuration.getDefaultQueueJNDIContext());
- initialContext.unbind(Configuration.getDefaultTopicJNDIContext());
+ initialContext.unbind(serverPeer.getConfiguration().getDefaultQueueJNDIContext());
+ initialContext.unbind(serverPeer.getConfiguration().getDefaultTopicJNDIContext());
initialContext.close();
}
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/ServerPeer.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/ServerPeer.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -139,12 +139,13 @@
private Connector connector;
+ private Configuration configuration;
+
// Constructors ---------------------------------------------------------------------------------
public ServerPeer() throws Exception
{
// Some wired components need to be started here
- securityStore = new SecurityMetadataStore();
version = Version.instance();
@@ -166,7 +167,7 @@
return;
}
- if (Configuration.getServerPeerID() < 0)
+ if (configuration.getServerPeerID() < 0)
{
throw new IllegalStateException("ServerPeerID not set");
}
@@ -176,12 +177,14 @@
loadClientAOPConfig();
loadServerAOPConfig();
- ((JDBCPersistenceManager) persistenceManager).injectNodeID(Configuration.getServerPeerID());
+ ((JDBCPersistenceManager) persistenceManager).injectNodeID(configuration.getServerPeerID());
// We get references to some plugins lazily to avoid problems with circular MBean
// dependencies
// Create the wired components
+
+ securityStore = new SecurityMetadataStore(this);
messageIDManager = new IDManager("MESSAGE_ID", 4096, persistenceManager);
channelIDManager = new IDManager("CHANNEL_ID", 10, persistenceManager);
transactionIDManager = new IDManager("TRANSACTION_ID", 1024, persistenceManager);
@@ -195,12 +198,12 @@
connectionFactoryDeployer = new ConnectionFactoryDeployer(this, connector);
txRepository =
new TransactionRepository(persistenceManager, messageStore, transactionIDManager);
- messageCounterManager = new MessageCounterManager(Configuration.getMessageCounterSamplePeriod());
- Configuration.addPropertyChangeListener(new PropertyChangeListener()
+ messageCounterManager = new MessageCounterManager(configuration.getMessageCounterSamplePeriod());
+ configuration.addPropertyChangeListener(new PropertyChangeListener()
{
public void propertyChange(PropertyChangeEvent evt)
{
- messageCounterManager.reschedule(Configuration.getMessageCounterSamplePeriod());
+ messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
}
});
//inverted dependancy, post office will now inject this
@@ -208,18 +211,18 @@
clusterNotifier.registerListener(connectionManager);
clusterNotifier.registerListener(connFactoryJNDIMapper);
- failoverWaiter = new FailoverWaiter(Configuration.getServerPeerID(), Configuration.getFailoverStartTimeout(), Configuration.getFailoverCompleteTimeout(), txRepository);
+ failoverWaiter = new FailoverWaiter(configuration.getServerPeerID(), configuration.getFailoverStartTimeout(), configuration.getFailoverCompleteTimeout(), txRepository);
clusterNotifier.registerListener(failoverWaiter);
- if (Configuration.getSuckerPassword() == null)
+ if (configuration.getSuckerPassword() == null)
{
- Configuration.setSuckerPassword(SecurityMetadataStore.DEFAULT_SUCKER_USER_PASSWORD);
+ configuration.setSuckerPassword(SecurityMetadataStore.DEFAULT_SUCKER_USER_PASSWORD);
}
- if (Configuration.getClusterPullConnectionFactoryName() != null)
+ if (configuration.getClusterPullConnectionFactoryName() != null)
{
- clusterConnectionManager = new ClusterConnectionManager(Configuration.isUseXAForMessagePull(), Configuration.getServerPeerID(),
- Configuration.getClusterPullConnectionFactoryName(), Configuration.isDefaultPreserveOrdering(),
- SecurityMetadataStore.SUCKER_USER, Configuration.getSuckerPassword());
+ clusterConnectionManager = new ClusterConnectionManager(configuration.isUseXAForMessagePull(), configuration.getServerPeerID(),
+ configuration.getClusterPullConnectionFactoryName(), configuration.isDefaultPreserveOrdering(),
+ SecurityMetadataStore.SUCKER_USER, configuration.getSuckerPassword());
clusterNotifier.registerListener(clusterConnectionManager);
}
@@ -234,7 +237,7 @@
connectorManager.start();
memoryManager.start();
messageStore.start();
- securityStore.setSuckerPassword(Configuration.getSuckerPassword());
+ securityStore.setSuckerPassword(configuration.getSuckerPassword());
securityStore.start();
txRepository.start();
clusterConnectionManager.start();
@@ -255,7 +258,7 @@
//We do this right at the end otherwise it can start handling invocations before we are properly started
JMSServerInvocationHandler.setClosed(false);
- if (Configuration.isClustered())
+ if (configuration.isClustered())
{
Replicator rep = (Replicator) postOffice;
@@ -277,7 +280,7 @@
started = true;
log.info("JBoss Messaging " + getVersion().getProviderVersion() + " server [" +
- Configuration.getServerPeerID() + "] started");
+ configuration.getServerPeerID() + "] started");
}
catch (Throwable t)
{
@@ -774,14 +777,14 @@
{
Queue dlq = null;
- if (Configuration.getDefaultDLQ() != null)
+ if (configuration.getDefaultDLQ() != null)
{
- Binding binding = postOffice.getBindingForQueueName(Configuration.getDefaultDLQ());
+ Binding binding = postOffice.getBindingForQueueName(configuration.getDefaultDLQ());
if (binding == null)
{
- throw new IllegalStateException("Cannot find binding for queue " + Configuration.getDefaultDLQ());
+ throw new IllegalStateException("Cannot find binding for queue " + configuration.getDefaultDLQ());
}
Queue queue = binding.queue;
@@ -799,16 +802,16 @@
{
Queue expiryQueue = null;
- if (Configuration.getDefaultExpiryQueue() != null)
+ if (configuration.getDefaultExpiryQueue() != null)
{
- if (Configuration.getDefaultDLQ() != null)
+ if (configuration.getDefaultDLQ() != null)
{
- Binding binding = postOffice.getBindingForQueueName(Configuration.getDefaultExpiryQueue());
+ Binding binding = postOffice.getBindingForQueueName(configuration.getDefaultExpiryQueue());
if (binding == null)
{
- throw new IllegalStateException("Cannot find binding for queue " + Configuration.getDefaultExpiryQueue());
+ throw new IllegalStateException("Cannot find binding for queue " + configuration.getDefaultExpiryQueue());
}
Queue queue = binding.queue;
@@ -940,7 +943,7 @@
public String toString()
{
- return "ServerPeer[" + Configuration.getServerPeerID() + "]";
+ return "ServerPeer[" + configuration.getServerPeerID() + "]";
}
// Package protected ----------------------------------------------------------------------------
@@ -1000,6 +1003,16 @@
}
+ public Configuration getConfiguration()
+ {
+ return configuration;
+ }
+
+ public void setConfiguration(Configuration configuration)
+ {
+ this.configuration = configuration;
+ }
+
public Connector getConnector()
{
return connector;
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -29,7 +29,6 @@
import org.jboss.jms.client.plugin.LoadBalancingPolicy;
import org.jboss.jms.client.plugin.NoLoadBalancingLoadBalancingFactory;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
-import org.jboss.jms.server.Configuration;
import org.jboss.jms.server.ConnectionFactoryManager;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
@@ -158,10 +157,10 @@
boolean creatingClustered = (supportsFailover || supportsLoadBalancing) && replicator != null;
//The server peer strict setting overrides the connection factory
- boolean useStrict = Configuration.isStrictTck() || strictTck;
+ boolean useStrict = serverPeer.getConfiguration().isStrictTck() || strictTck;
ClientConnectionFactoryDelegate localDelegate =
- new ClientConnectionFactoryDelegate(uniqueName, id, Configuration.getServerPeerID(),
+ new ClientConnectionFactoryDelegate(uniqueName, id, serverPeer.getConfiguration().getServerPeerID(),
locatorURI, version, clientPing, useStrict);
log.debug(this + " created local delegate " + localDelegate);
@@ -250,7 +249,7 @@
}
}
- if (trace) { log.trace("Removing delegate from delegates list with key=" + uniqueName + " at serverPeerID=" + Configuration.getServerPeerID()); }
+ if (trace) { log.trace("Removing delegate from delegates list with key=" + uniqueName + " at serverPeerID=" + serverPeer.getConfiguration().getServerPeerID()); }
ConnectionFactoryDelegate delegate = (ConnectionFactoryDelegate)delegates.remove(uniqueName);
@@ -403,7 +402,7 @@
public String toString()
{
- return "Server[" + Configuration.getServerPeerID() + "].ConnFactoryJNDIMapper";
+ return "Server[" + serverPeer.getConfiguration().getServerPeerID() + "].ConnFactoryJNDIMapper";
}
// Package protected ----------------------------------------------------------------------------
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/destination/DestinationDeployer.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/destination/DestinationDeployer.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/destination/DestinationDeployer.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -21,7 +21,6 @@
*/
package org.jboss.jms.server.destination;
-import org.jboss.jms.server.Configuration;
import org.jboss.jms.server.JMSCondition;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.messagecounter.MessageCounter;
@@ -206,14 +205,14 @@
destination.setExpiryQueue(expq);
destination.setSecurityConfig(securityConfig);
destination.setCreatedProgrammatically(true);
- MessagingQueue queue = new MessagingQueue(Configuration.getServerPeerID(), destination.getName(),
+ MessagingQueue queue = new MessagingQueue(serverPeer.getConfiguration().getServerPeerID(), destination.getName(),
serverPeer.getChannelIDManager().getID(),
serverPeer.getMessageStore(), serverPeer.getPersistenceManagerInstance(),
true,
destination.getMaxSize(), null,
destination.getFullSize(), destination.getPageSize(),
destination.getDownCacheSize(), destination.isClustered(),
- Configuration.getRecoverDeliveriesTimeout());
+ serverPeer.getConfiguration().getRecoverDeliveriesTimeout());
boolean added = getServerPeer().getPostOffice().addBinding(new Binding(queueCond, queue, false), false);
if (added)
@@ -279,7 +278,7 @@
if (dayLimitToUse == -1)
{
//Use override on server peer
- dayLimitToUse = Configuration.getDefaultMessageCounterHistoryDayLimit();
+ dayLimitToUse = serverPeer.getConfiguration().getDefaultMessageCounterHistoryDayLimit();
}
MessageCounter counter =
@@ -333,14 +332,14 @@
destination.setFullSize(fullSize);
}
destination.setJndiName(jndiName);
- MessagingQueue queue = new MessagingQueue(Configuration.getServerPeerID(), destination.getName(),
+ MessagingQueue queue = new MessagingQueue(serverPeer.getConfiguration().getServerPeerID(), destination.getName(),
serverPeer.getChannelIDManager().getID(),
serverPeer.getMessageStore(), serverPeer.getPersistenceManagerInstance(),
true,
destination.getMaxSize(), null,
destination.getFullSize(), destination.getPageSize(),
destination.getDownCacheSize(), destination.isClustered(),
- Configuration.getRecoverDeliveriesTimeout());
+ serverPeer.getConfiguration().getRecoverDeliveriesTimeout());
boolean added = serverPeer.getPostOffice().addBinding(new Binding(queueCond, queue, false), false);
if (added)
@@ -413,7 +412,7 @@
if (dayLimitToUse == -1)
{
//Use override on server peer
- dayLimitToUse = Configuration.getDefaultMessageCounterHistoryDayLimit();
+ dayLimitToUse = serverPeer.getConfiguration().getDefaultMessageCounterHistoryDayLimit();
}
MessageCounter counter =
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -29,7 +29,10 @@
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.server.*;
+import org.jboss.jms.server.ConnectionManager;
+import org.jboss.jms.server.JMSCondition;
+import org.jboss.jms.server.SecurityStore;
+import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.endpoint.advised.SessionAdvised;
import org.jboss.jms.tx.ClientTransaction;
import org.jboss.jms.tx.ClientTransaction.SessionTxState;
@@ -378,7 +381,7 @@
{
// Temporary queues must be unbound on ALL nodes of the cluster
- postOffice.removeBinding(dest.getName(), Configuration.isClustered());
+ postOffice.removeBinding(dest.getName(), serverPeer.getConfiguration().isClustered());
}
else
{
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -27,7 +27,6 @@
import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.TopologyResult;
-import org.jboss.jms.server.Configuration;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
@@ -174,7 +173,7 @@
// Wait for server side failover to complete
int failoverNodeID = serverPeer.getFailoverWaiter().waitForFailover(failedNodeID);
- if (failoverNodeID == -1 || failoverNodeID != Configuration.getServerPeerID())
+ if (failoverNodeID == -1 || failoverNodeID != serverPeer.getConfiguration().getServerPeerID())
{
log.trace(this + " realized that we are on the wrong node or no failover has occured");
return new CreateConnectionResult(failoverNodeID);
@@ -266,7 +265,7 @@
// See http://jira.jboss.com/jira/browse/JBMESSAGING-797
synchronized (AspectManager.instance())
{
- return new ClientConnectionDelegate(connectionID, Configuration.getServerPeerID());
+ return new ClientConnectionDelegate(connectionID, serverPeer.getConfiguration().getServerPeerID());
}
}
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -24,7 +24,6 @@
import org.jboss.jms.delegate.ConsumerEndpoint;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.server.Configuration;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.jms.server.messagecounter.MessageCounter;
@@ -103,10 +102,12 @@
private boolean slow;
private volatile boolean dead;
+
+ private ServerPeer sp;
// Constructors ---------------------------------------------------------------------------------
- ServerConsumerEndpoint(String id, Queue messageQueue, String queueName,
+ ServerConsumerEndpoint(ServerPeer sp,String id, Queue messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint, String selector,
boolean noLocal, JBossDestination dest, Queue dlq,
Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts,
@@ -116,7 +117,7 @@
{
log.trace("constructing consumer endpoint " + id);
}
-
+ this.sp = sp;
this.id = id;
this.messageQueue = messageQueue;
@@ -144,7 +145,7 @@
this.startStopLock = new Object();
- this.preserveOrdering = Configuration.isDefaultPreserveOrdering();
+ this.preserveOrdering = sp.getConfiguration().isDefaultPreserveOrdering();
this.replicating = replicating;
@@ -540,7 +541,7 @@
{
//Durable sub consumer
- if (queue.isClustered() && Configuration.isClustered())
+ if (queue.isClustered() && sp.getConfiguration().isClustered())
{
//Clustered durable sub consumer created - we need to remove this info from the replicator
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -33,7 +33,6 @@
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.server.Configuration;
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.JMSCondition;
import org.jboss.jms.server.ServerPeer;
@@ -175,11 +174,11 @@
postOffice = sp.getPostOffice();
- supportsFailover = connectionEndpoint.getConnectionFactoryEndpoint().isSupportsFailover() && Configuration.isClustered();
+ supportsFailover = connectionEndpoint.getConnectionFactoryEndpoint().isSupportsFailover() && sp.getConfiguration().isClustered();
idm = sp.getChannelIDManager();
- nodeId = Configuration.getServerPeerID();
+ nodeId = sp.getConfiguration().getServerPeerID();
tr = sp.getTxRepository();
@@ -193,9 +192,9 @@
tr = sp.getTxRepository();
- defaultMaxDeliveryAttempts = Configuration.getDefaultMaxDeliveryAttempts();
+ defaultMaxDeliveryAttempts = sp.getConfiguration().getDefaultMaxDeliveryAttempts();
- defaultRedeliveryDelay = Configuration.getDefaultRedeliveryDelay();
+ defaultRedeliveryDelay = sp.getConfiguration().getDefaultRedeliveryDelay();
deliveries = new ConcurrentHashMap();
@@ -495,7 +494,7 @@
try
{
- if (!Configuration.isClustered())
+ if (!sp.getConfiguration().isClustered())
{
throw new IllegalStateException("Recovering deliveries but post office is not clustered!");
}
@@ -654,11 +653,11 @@
if (dest.isTopic())
{
- mDest = new ManagedTopic(dest.getName(), fullSize, pageSize, downCacheSize, Configuration.isClustered());
+ mDest = new ManagedTopic(dest.getName(), fullSize, pageSize, downCacheSize, sp.getConfiguration().isClustered());
}
else
{
- mDest = new ManagedQueue(dest.getName(), fullSize, pageSize, downCacheSize, Configuration.isClustered());
+ mDest = new ManagedQueue(dest.getName(), fullSize, pageSize, downCacheSize, sp.getConfiguration().isClustered());
}
mDest.setTemporary(true);
@@ -669,8 +668,8 @@
{
Queue coreQueue = new MessagingQueue(nodeId, dest.getName(),
idm.getID(), ms, pm, false, -1, null,
- fullSize, pageSize, downCacheSize, Configuration.isClustered(),
- Configuration.getRecoverDeliveriesTimeout());
+ fullSize, pageSize, downCacheSize, sp.getConfiguration().isClustered(),
+ sp.getConfiguration().getRecoverDeliveriesTimeout());
Condition cond = new JMSCondition(true, dest.getName());
@@ -678,7 +677,7 @@
// make a binding for this temporary queue
// temporary queues need to bound on ALL nodes of the cluster
- postOffice.addBinding(new Binding(cond, coreQueue, true), Configuration.isClustered());
+ postOffice.addBinding(new Binding(cond, coreQueue, true), sp.getConfiguration().isClustered());
coreQueue.activate();
}
@@ -727,7 +726,7 @@
// temporary queues must be unbound on ALL nodes of the cluster
- postOffice.removeBinding(dest.getName(), Configuration.isClustered());
+ postOffice.removeBinding(dest.getName(), sp.getConfiguration().isClustered());
}
else
{
@@ -801,7 +800,7 @@
//Also if it is clustered we must disallow unsubscribing if it has active consumers on other nodes
- if (sub.isClustered() && Configuration.isClustered())
+ if (sub.isClustered() && sp.getConfiguration().isClustered())
{
Replicator rep = (Replicator)postOffice;
@@ -814,7 +813,7 @@
}
}
- postOffice.removeBinding(sub.getName(), sub.isClustered() && Configuration.isClustered());
+ postOffice.removeBinding(sub.getName(), sub.isClustered() && sp.getConfiguration().isClustered());
String counterName = ManagedDestination.SUBSCRIPTION_MESSAGECOUNTER_PREFIX + sub.getName();
@@ -1792,7 +1791,7 @@
//We don't care about redelivery delays and number of attempts for a direct consumer
ServerConsumerEndpoint ep =
- new ServerConsumerEndpoint(consumerID, binding.queue,
+ new ServerConsumerEndpoint(sp, consumerID, binding.queue,
binding.queue.getName(), this, selectorString, false,
dest, null, null, 0, -1, true, false);
@@ -1891,7 +1890,7 @@
mDest.getPageSize(),
mDest.getDownCacheSize(),
mDest.isClustered(),
- Configuration.getRecoverDeliveriesTimeout());
+ sp.getConfiguration().getRecoverDeliveriesTimeout());
JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
@@ -1905,7 +1904,7 @@
if (dayLimitToUse == -1)
{
//Use override on server peer
- dayLimitToUse = Configuration.getDefaultMessageCounterHistoryDayLimit();
+ dayLimitToUse = sp.getConfiguration().getDefaultMessageCounterHistoryDayLimit();
}
//We don't create message counters on temp topics
@@ -1949,12 +1948,12 @@
mDest.getPageSize(),
mDest.getDownCacheSize(),
mDest.isClustered(),
- Configuration.getRecoverDeliveriesTimeout());
+ sp.getConfiguration().getRecoverDeliveriesTimeout());
// Durable subs must be bound on ALL nodes of the cluster (if clustered)
postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true),
- Configuration.isClustered() && mDest.isClustered());
+ sp.getConfiguration().isClustered() && mDest.isClustered());
queue.activate();
@@ -2028,7 +2027,7 @@
// Durable subs must be unbound on ALL nodes of the cluster
- postOffice.removeBinding(queue.getName(), Configuration.isClustered() && mDest.isClustered());
+ postOffice.removeBinding(queue.getName(), sp.getConfiguration().isClustered() && mDest.isClustered());
// create a fresh new subscription
@@ -2038,12 +2037,12 @@
mDest.getPageSize(),
mDest.getDownCacheSize(),
mDest.isClustered(),
- Configuration.getRecoverDeliveriesTimeout());
+ sp.getConfiguration().getRecoverDeliveriesTimeout());
// Durable subs must be bound on ALL nodes of the cluster
postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true),
- Configuration.isClustered() && mDest.isClustered());
+ sp.getConfiguration().isClustered() && mDest.isClustered());
queue.activate();
@@ -2093,12 +2092,12 @@
boolean replicating = supportsFailover && queue.isClustered() && !(jmsDestination.isTopic() && !queue.isRecoverable());
ServerConsumerEndpoint ep =
- new ServerConsumerEndpoint(consumerID, queue,
+ new ServerConsumerEndpoint(sp, consumerID, queue,
queue.getName(), this, selectorString, noLocal,
jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelayToUse,
maxDeliveryAttemptsToUse, false, replicating);
- if (queue.isClustered() && Configuration.isClustered() && jmsDestination.isTopic() && subscriptionName != null)
+ if (queue.isClustered() && sp.getConfiguration().isClustered() && jmsDestination.isTopic() && subscriptionName != null)
{
//Clustered durable sub consumer created - we need to add this info in the replicator - it is needed by other nodes
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -21,8 +21,8 @@
*/
package org.jboss.jms.server.security;
-import org.jboss.jms.server.Configuration;
import org.jboss.jms.server.SecurityStore;
+import org.jboss.jms.server.ServerPeer;
import org.jboss.logging.Logger;
import org.jboss.security.AuthenticationManager;
import org.jboss.security.RealmMapping;
@@ -73,12 +73,15 @@
private String suckerPassword;
+ private ServerPeer serverPeer;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SecurityMetadataStore()
+ public SecurityMetadataStore(ServerPeer serverPeer)
{
+ this.serverPeer = serverPeer;
queueSecurityConf = new HashMap();
topicSecurityConf = new HashMap();
}
@@ -93,12 +96,12 @@
if (m == null)
{
// No SecurityMetadata was configured for the destination, apply the default
- if (Configuration.getSecurityConfig() != null)
+ if (serverPeer.getConfiguration().getSecurityConfig() != null)
{
log.debug("No SecurityMetadadata was available for " + destName + ", using default security config");
try
{
- m = new SecurityMetadata(Configuration.getSecurityConfig());
+ m = new SecurityMetadata(serverPeer.getConfiguration().getSecurityConfig());
}
catch (Exception e)
{
@@ -238,7 +241,7 @@
try
{
- Object mgr = ic.lookup(Configuration.getSecurityDomain());
+ Object mgr = ic.lookup(serverPeer.getConfiguration().getSecurityDomain());
log.debug("JaasSecurityManager is " + mgr);
@@ -250,12 +253,12 @@
catch (NamingException e)
{
// Apparently there is no security context, try adding java:/jaas
- log.warn("Failed to lookup securityDomain " + Configuration.getSecurityDomain(), e);
+ log.warn("Failed to lookup securityDomain " + serverPeer.getConfiguration().getSecurityDomain(), e);
- if (!Configuration.getSecurityDomain().startsWith("java:/jaas/"))
+ if (!serverPeer.getConfiguration().getSecurityDomain().startsWith("java:/jaas/"))
{
authenticationManager =
- (SubjectSecurityManager)ic.lookup("java:/jaas/" + Configuration.getSecurityDomain());
+ (SubjectSecurityManager)ic.lookup("java:/jaas/" + serverPeer.getConfiguration().getSecurityDomain());
}
else
{
Modified: branches/Branch_MC_Integration_New/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_MC_Integration_New/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -191,6 +191,8 @@
//overwhelming JGroups
private ClearableSemaphore replicateSemaphore;
+ private Configuration configuration;
+
// Constructors ---------------------------------------------------------------------------------
public MessagingPostOffice()
@@ -223,31 +225,31 @@
loadedBindings = getBindingsFromStorage();
- if (Configuration.isClustered())
+ if (configuration.isClustered())
{
- ChannelFactory jChannelFactory = new MultiplexerChannelFactory(channelFactory, Configuration.getChannelPartitionName(),
- Configuration.getControlChannelName(), Configuration.getDataChannelName());
- groupMember = new GroupMember(Configuration.getGroupName(), Configuration.getStateTimeout(), Configuration.getCastTimeout(), jChannelFactory, this, this);
- replicateSemaphore = new ClearableSemaphore(Configuration.getMaxConcurrentReplications());
+ ChannelFactory jChannelFactory = new MultiplexerChannelFactory(channelFactory, configuration.getChannelPartitionName(),
+ configuration.getControlChannelName(), configuration.getDataChannelName());
+ groupMember = new GroupMember(configuration.getGroupName(), configuration.getStateTimeout(), configuration.getCastTimeout(), jChannelFactory, this, this);
+ replicateSemaphore = new ClearableSemaphore(configuration.getMaxConcurrentReplications());
- log.debug("Using JGroups flow control workaround: " + Configuration.isUseJGroupsWorkaround());
+ log.debug("Using JGroups flow control workaround: " + configuration.isUseJGroupsWorkaround());
nbSupport = new NotificationBroadcasterSupport();
groupMember.start();
//Sanity check - we check there aren't any other nodes already in the cluster with the same node id
- if (knowAboutNodeId(Configuration.getServerPeerID()))
+ if (knowAboutNodeId(configuration.getServerPeerID()))
{
throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
- "cluster with the same node id (" + Configuration.getServerPeerID() + "). " +
+ "cluster with the same node id (" + configuration.getServerPeerID() + "). " +
"Are you sure you have given each node a unique node id during installation?");
}
PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getSyncAddress(), groupMember.getAsyncAddress());
- nodeIDAddressMap.put(new Integer(Configuration.getServerPeerID()), info);
+ nodeIDAddressMap.put(new Integer(configuration.getServerPeerID()), info);
//calculate the failover map
calculateFailoverMap();
@@ -257,7 +259,7 @@
//add our vm identifier to the replicator
put(Replicator.JVM_ID_KEY, clientVMId);
- groupMember.multicastControl(new JoinClusterRequest(Configuration.getServerPeerID(), info), true);
+ groupMember.multicastControl(new JoinClusterRequest(configuration.getServerPeerID(), info), true);
}
//Now load the bindings for this node
@@ -282,10 +284,10 @@
super.stop();
- if (Configuration.isClustered())
+ if (configuration.isClustered())
{
//Need to send this *before* stopping
- groupMember.multicastControl(new LeaveClusterRequest(Configuration.getServerPeerID()), true);
+ groupMember.multicastControl(new LeaveClusterRequest(configuration.getServerPeerID()), true);
groupMember.stop();
}
@@ -328,7 +330,7 @@
boolean added = internalAddBinding(binding, allNodes, true);
- if (added && allNodes && Configuration.isClustered() && binding.queue.isClustered())
+ if (added && allNodes && configuration.isClustered() && binding.queue.isClustered())
{
//Now we must wait for all the bindings to appear in state
//This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
@@ -348,7 +350,7 @@
{
Binding binding = internalRemoveBinding(queueName, allNodes, true);
- if (binding != null && allNodes && Configuration.isClustered() && binding.queue.isClustered())
+ if (binding != null && allNodes && configuration.isClustered() && binding.queue.isClustered())
{
//Now we must wait for all the bindings to be removed from state
//This is necessary since the second unbind in an all unbind is sent asynchronously to avoid deadlock
@@ -381,7 +383,7 @@
throw new IllegalArgumentException("Condition is null");
}
- if (!localOnly && !Configuration.isClustered())
+ if (!localOnly && !configuration.isClustered())
{
throw new IllegalArgumentException("Cannot request clustered queues on non clustered post office");
}
@@ -408,7 +410,7 @@
{
Queue queue = (Queue)iter.next();
- if (!localOnly || (queue.getNodeID() == Configuration.getServerPeerID()))
+ if (!localOnly || (queue.getNodeID() == configuration.getServerPeerID()))
{
list.add(queue);
}
@@ -503,7 +505,7 @@
//This is to prevent overwhelming JGroups
//See http://jira.jboss.com/jira/browse/JBMESSAGING-1112
- if (reply && Configuration.isUseJGroupsWorkaround())
+ if (reply && configuration.isUseJGroupsWorkaround())
{
//We timeout to avoid locking the system in event of failure
boolean ok = replicateSemaphore.tryAcquire(SEMAPHORE_ACQUIRE_TIMEOUT);
@@ -527,12 +529,12 @@
{
//TODO optimise this
- PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(Configuration.getServerPeerID()));
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(configuration.getServerPeerID()));
replyAddress = info.getDataChannelAddress();
}
- ClusterRequest request = new ReplicateDeliveryMessage(Configuration.getServerPeerID(), queueName, sessionID, messageID, deliveryID, replyAddress);
+ ClusterRequest request = new ReplicateDeliveryMessage(configuration.getServerPeerID(), queueName, sessionID, messageID, deliveryID, replyAddress);
if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
@@ -560,7 +562,7 @@
//There is no need to lock this while failover node change is occuring since the receiving node is tolerant to duplicate
//adds or acks
- ClusterRequest request = new ReplicateAckMessage(Configuration.getServerPeerID(), queueName, messageID);
+ ClusterRequest request = new ReplicateAckMessage(configuration.getServerPeerID(), queueName, messageID);
Address address = getFailoverNodeDataChannelAddress();
@@ -660,7 +662,7 @@
long channelID = channelIDManager.getID();
- Queue queue2 = new MessagingQueue(Configuration.getServerPeerID(), mapping.getQueueName(), channelID, ms, pm,
+ Queue queue2 = new MessagingQueue(configuration.getServerPeerID(), mapping.getQueueName(), channelID, ms, pm,
mapping.isRecoverable(), mapping.getMaxSize(), filter,
mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(),
true, mapping.getRecoverDeliveriesTimeout());
@@ -780,7 +782,7 @@
if (trace) { log.trace("First node is now " + firstNode); }
- if (firstNode && Configuration.isUseJGroupsWorkaround())
+ if (firstNode && configuration.isUseJGroupsWorkaround())
{
//If we are now the first node in the cluster then any outstanding replication requests will not get responses
//so we must release these and we have no more need of a semaphore until another node joins
@@ -823,7 +825,7 @@
throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
}
- if (fnodeID.intValue() == Configuration.getServerPeerID())
+ if (fnodeID.intValue() == configuration.getServerPeerID())
{
// The node crashed and we are the failover node so let's perform failover
@@ -901,7 +903,7 @@
long channelID = channelIDManager.getID();
- Queue queue2 = new MessagingQueue(Configuration.getServerPeerID(), mapping.getQueueName(), channelID, ms, pm,
+ Queue queue2 = new MessagingQueue(configuration.getServerPeerID(), mapping.getQueueName(), channelID, ms, pm,
mapping.isRecoverable(), mapping.getMaxSize(), filter,
mapping.getFullSize(), mapping.getPageSize(), mapping.getDownCacheSize(), true,
mapping.getRecoverDeliveriesTimeout());
@@ -979,7 +981,7 @@
calculateFailoverMap();
- if (wasFirstNode && Configuration.isUseJGroupsWorkaround())
+ if (wasFirstNode && configuration.isUseJGroupsWorkaround())
{
//If we were the first node but now another node has joined - we need to re-enable the semaphore
replicateSemaphore.enable();
@@ -1191,7 +1193,7 @@
//TODO - this does not belong here
final ServerSessionEndpoint session = serverPeer.getSession(sessionID);
- if (Configuration.isUseJGroupsWorkaround())
+ if (configuration.isUseJGroupsWorkaround())
{
replicateSemaphore.release();
}
@@ -1297,9 +1299,9 @@
public void put(Serializable key, Serializable replicant) throws Exception
{
- putReplicantLocally(Configuration.getServerPeerID(), key, replicant);
+ putReplicantLocally(configuration.getServerPeerID(), key, replicant);
- PutReplicantRequest request = new PutReplicantRequest(Configuration.getServerPeerID(), key, replicant);
+ PutReplicantRequest request = new PutReplicantRequest(configuration.getServerPeerID(), key, replicant);
groupMember.multicastControl(request, true);
}
@@ -1316,9 +1318,9 @@
public boolean remove(Serializable key) throws Exception
{
- if (removeReplicantLocally(Configuration.getServerPeerID(), key))
+ if (removeReplicantLocally(configuration.getServerPeerID(), key))
{
- RemoveReplicantRequest request = new RemoveReplicantRequest(Configuration.getServerPeerID(), key);
+ RemoveReplicantRequest request = new RemoveReplicantRequest(configuration.getServerPeerID(), key);
groupMember.multicastControl(request, true);
@@ -1498,7 +1500,7 @@
nodeIDAddressMap = new ConcurrentHashMap();
- if (Configuration.isClustered())
+ if (configuration.isClustered())
{
replicatedData = new HashMap();
@@ -1512,7 +1514,7 @@
replicateResponseExecutor = new QueuedExecutor(new LinkedQueue());
- replicateSemaphore = new ClearableSemaphore(Configuration.getMaxConcurrentReplications());
+ replicateSemaphore = new ClearableSemaphore(configuration.getMaxConcurrentReplications());
}
@@ -1526,7 +1528,7 @@
nodeIDAddressMap = null;
- if (Configuration.isClustered())
+ if (configuration.isClustered())
{
replicatedData = null;
@@ -1542,11 +1544,11 @@
private void requestDeliveries(Queue queue) throws Exception
{
- if (!firstNode && supportsFailover && Configuration.isClustered() && queue.isClustered())
+ if (!firstNode && supportsFailover && configuration.isClustered() && queue.isClustered())
{
// reverse lookup in failover map
- Integer masterNodeID = getMasterForFailoverNodeID(Configuration.getServerPeerID());
+ Integer masterNodeID = getMasterForFailoverNodeID(configuration.getServerPeerID());
if (masterNodeID != null)
{
@@ -1565,7 +1567,7 @@
dumpFailoverMap(this.failoverMap);
- PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(Configuration.getServerPeerID()));
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(configuration.getServerPeerID()));
Address replyAddress = info.getDataChannelAddress();
@@ -1716,7 +1718,7 @@
private boolean internalAddBinding(Binding binding, boolean allNodes, boolean sync) throws Exception
{
- if (trace) { log.trace(Configuration.getServerPeerID() + " binding " + binding.queue + " with condition " + binding.condition + " all nodes " + allNodes); }
+ if (trace) { log.trace(configuration.getServerPeerID() + " binding " + binding.queue + " with condition " + binding.condition + " all nodes " + allNodes); }
if (binding == null)
{
@@ -1732,7 +1734,7 @@
throw new IllegalArgumentException("Queue is null");
}
- if (queue.getNodeID() != Configuration.getServerPeerID())
+ if (queue.getNodeID() != configuration.getServerPeerID())
{
throw new IllegalArgumentException("Cannot bind a queue from another node");
}
@@ -1753,11 +1755,11 @@
insertBindingInStorage(condition, queue, binding.allNodes);
}
- if (Configuration.isClustered() && queue.isClustered())
+ if (configuration.isClustered() && queue.isClustered())
{
String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
- MappingInfo info = new MappingInfo(Configuration.getServerPeerID(), queue.getName(), condition.toText(), filterString, queue.getChannelID(),
+ MappingInfo info = new MappingInfo(configuration.getServerPeerID(), queue.getName(), condition.toText(), filterString, queue.getChannelID(),
queue.isRecoverable(), true,
binding.allNodes,
queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
@@ -1775,14 +1777,14 @@
private Binding internalRemoveBinding(String queueName, boolean allNodes, boolean sync) throws Throwable
{
- if (trace) { log.trace(Configuration.getServerPeerID() + " unbind queue: " + queueName + " all nodes " + allNodes); }
+ if (trace) { log.trace(configuration.getServerPeerID() + " unbind queue: " + queueName + " all nodes " + allNodes); }
if (queueName == null)
{
throw new IllegalArgumentException("Queue name is null");
}
- Binding removed = removeBindingInMemory(Configuration.getServerPeerID(), queueName);
+ Binding removed = removeBindingInMemory(configuration.getServerPeerID(), queueName);
//The queue might not be removed (it's already removed) if two unbind all requests are sent simultaneously on the cluster
if (removed != null)
@@ -1798,11 +1800,11 @@
deleteBindingFromStorage(queue);
}
- if (Configuration.isClustered() && queue.isClustered())
+ if (configuration.isClustered() && queue.isClustered())
{
String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
- MappingInfo info = new MappingInfo(Configuration.getServerPeerID(), queue.getName(), condition.toText(), filterString, queue.getChannelID(),
+ MappingInfo info = new MappingInfo(configuration.getServerPeerID(), queue.getName(), condition.toText(), filterString, queue.getChannelID(),
queue.isRecoverable(), true, allNodes);
UnbindRequest request = new UnbindRequest(info, allNodes);
@@ -1859,11 +1861,11 @@
failoverMap.put(theNodeID, failoverNodeID);
}
- int fid = ((Integer)failoverMap.get(new Integer(Configuration.getServerPeerID()))).intValue();
+ int fid = ((Integer)failoverMap.get(new Integer(configuration.getServerPeerID()))).intValue();
//if we are the first node in the cluster we don't want to be our own failover node!
- if (fid == Configuration.getServerPeerID())
+ if (fid == configuration.getServerPeerID())
{
firstNode = true;
failoverNodeID = -1;
@@ -1969,7 +1971,7 @@
if (trace) { log.trace(this + " considering queue " + queue); }
- if (queue.getNodeID() == Configuration.getServerPeerID())
+ if (queue.getNodeID() == configuration.getServerPeerID())
{
if (trace) { log.trace(this + " is a local queue"); }
@@ -2256,7 +2258,7 @@
nameMaps.put(nid, nameMap);
- if (queue.getNodeID() == Configuration.getServerPeerID())
+ if (queue.getNodeID() == configuration.getServerPeerID())
{
localNameMap = nameMap;
}
@@ -2342,9 +2344,9 @@
{
ps = conn.prepareStatement(getSQLStatement("LOAD_BINDINGS"));
- ps.setString(1, Configuration.getPostOfficeName());
+ ps.setString(1, configuration.getPostOfficeName());
- ps.setInt(2, Configuration.getServerPeerID());
+ ps.setInt(2, configuration.getServerPeerID());
rs = ps.executeQuery();
@@ -2374,8 +2376,8 @@
filter = filterFactory.createFilter(selector);
}
- Queue queue = new MessagingQueue(Configuration.getServerPeerID(), queueName, channelID, ms, pm,
- true, filter, bindingClustered && Configuration.isClustered());
+ Queue queue = new MessagingQueue(configuration.getServerPeerID(), queueName, channelID, ms, pm,
+ true, filter, bindingClustered && configuration.isClustered());
if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
@@ -2413,11 +2415,11 @@
Queue queue = binding.queue;
//Need to broadcast it too
- if (Configuration.isClustered() && queue.isClustered())
+ if (configuration.isClustered() && queue.isClustered())
{
String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
- MappingInfo info = new MappingInfo(Configuration.getServerPeerID(), queue.getName(), binding.condition.toText(), filterString, queue.getChannelID(),
+ MappingInfo info = new MappingInfo(configuration.getServerPeerID(), queue.getName(), binding.condition.toText(), filterString, queue.getChannelID(),
queue.isRecoverable(), true,
binding.allNodes,
queue.getFullSize(), queue.getPageSize(), queue.getDownCacheSize(),
@@ -2446,8 +2448,8 @@
{
ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
- ps.setString(1, Configuration.getPostOfficeName());
- ps.setInt(2, Configuration.getServerPeerID());
+ ps.setString(1, configuration.getPostOfficeName());
+ ps.setInt(2, configuration.getServerPeerID());
ps.setString(3, queue.getName());
ps.setString(4, condition.toText());
String filterString = queue.getFilter() != null ? queue.getFilter().getFilterString() : null;
@@ -2503,7 +2505,7 @@
{
ps = conn.prepareStatement(getSQLStatement("DELETE_BINDING"));
- ps.setString(1, Configuration.getPostOfficeName());
+ ps.setString(1, configuration.getPostOfficeName());
ps.setInt(2, queue.getNodeID());
ps.setString(3, queue.getName());
@@ -2711,7 +2713,7 @@
{
if (trace) { log.trace("Old failover node still exists, telling it remove replicated deliveries"); }
- ClusterRequest request = new AckAllReplicatedDeliveriesMessage(Configuration.getServerPeerID());
+ ClusterRequest request = new AckAllReplicatedDeliveriesMessage(configuration.getServerPeerID());
groupMember.unicastData(request, info.getDataChannelAddress());
@@ -2760,7 +2762,7 @@
throw new IllegalStateException("Cannot find address for failover node " + failoverNodeID);
}
- ClusterRequest request = new AddAllReplicatedDeliveriesMessage(Configuration.getServerPeerID(), deliveries);
+ ClusterRequest request = new AddAllReplicatedDeliveriesMessage(configuration.getServerPeerID(), deliveries);
groupMember.unicastData(request, info.getDataChannelAddress());
@@ -2790,7 +2792,7 @@
log.debug(this + " announced it is starting failover procedure");
- pm.mergeTransactions(failedNodeID.intValue(), Configuration.getServerPeerID());
+ pm.mergeTransactions(failedNodeID.intValue(), configuration.getServerPeerID());
// Need to lock
lock.writeLock().acquire();
@@ -2952,6 +2954,16 @@
this.clusterNotifier = clusterNotifier;
}
+ public Configuration getconfiguration()
+ {
+ return configuration;
+ }
+
+ public void setconfiguration(Configuration configuration)
+ {
+ this.configuration = configuration;
+ }
+
// Inner classes --------------------------------------------------------------------------------
private class SendReplicatedDeliveriesRunnable implements Runnable
@@ -2995,7 +3007,7 @@
if (gotSome)
{
- ClusterRequest req = new AddAllReplicatedDeliveriesMessage(Configuration.getServerPeerID(), dels);
+ ClusterRequest req = new AddAllReplicatedDeliveriesMessage(configuration.getServerPeerID(), dels);
groupMember.unicastData(req, address);
}
Modified: branches/Branch_MC_Integration_New/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
===================================================================
--- branches/Branch_MC_Integration_New/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java 2007-11-07 13:56:18 UTC (rev 3294)
+++ branches/Branch_MC_Integration_New/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java 2007-11-07 14:03:57 UTC (rev 3295)
@@ -9,7 +9,6 @@
import org.jboss.aop.advice.Interceptor;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.server.Configuration;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
@@ -252,7 +251,7 @@
{
try
{
- int serverId = Configuration.getServerPeerID();
+ int serverId = getServerPeer(target).getConfiguration().getServerPeerID();
//First unregister from the RMI registry
Registry registry = LocateRegistry.getRegistry(RMITestServer.DEFAULT_REGISTRY_PORT);
More information about the jboss-cvs-commits
mailing list