JBoss hornetq SVN: r11743 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: remoting/impl/netty and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-22 16:37:07 -0500 (Tue, 22 Nov 2011)
New Revision: 11743
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
Log:
avoiding dead locks through stomp / close session / exception handling
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-11-22 21:35:33 UTC (rev 11742)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-11-22 21:37:07 UTC (rev 11743)
@@ -17,6 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -66,6 +67,8 @@
private final Object failLock = new Object();
+ private final Executor executor;
+
private volatile boolean dataReceived;
public StompDecoder getDecoder()
@@ -73,7 +76,7 @@
return decoder;
}
- StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager)
+ StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager, final Executor executor)
{
this.transportConnection = transportConnection;
@@ -82,6 +85,8 @@
this.creationTime = System.currentTimeMillis();
this.acceptorUsed = acceptorUsed;
+
+ this.executor = executor;
}
public void addFailureListener(final FailureListener listener)
@@ -322,7 +327,6 @@
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
-
for (final FailureListener listener : listenersClone)
{
try
@@ -343,20 +347,26 @@
{
final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
- for (final CloseListener listener : listenersClone)
- {
- try
+ // avoiding a dead lock
+ executor.execute(new Runnable(){
+ public void run()
{
- listener.connectionClosed();
+ for (final CloseListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionClosed();
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
}
- catch (final Throwable t)
- {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- log.error("Failed to execute failure listener", t);
- }
- }
+ });
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-22 21:35:33 UTC (rev 11742)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-22 21:37:07 UTC (rev 11743)
@@ -120,7 +120,7 @@
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
{
- StompConnection conn = new StompConnection(acceptorUsed, connection, this);
+ StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getExecutorFactory().getExecutor());
// Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2011-11-22 21:35:33 UTC (rev 11742)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2011-11-22 21:37:07 UTC (rev 11743)
@@ -96,19 +96,20 @@
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) throws Exception
{
- synchronized (this)
+ if (!active)
{
- if (!active)
- {
- return;
- }
+ return;
+ }
- // We don't want to log this - since it is normal for this to happen during failover/reconnect
- // and we don't want to spew out stack traces in that event
- // The user has access to this exeception anyway via the HornetQException initial cause
+ // We don't want to log this - since it is normal for this to happen during failover/reconnect
+ // and we don't want to spew out stack traces in that event
+ // The user has access to this exeception anyway via the HornetQException initial cause
- HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR, "Netty exception");
- me.initCause(e.getCause());
+ HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR, "Netty exception");
+ me.initCause(e.getCause());
+
+ synchronized (listener)
+ {
try
{
listener.connectionException(e.getChannel().getId(), me);
13 years, 1 month
JBoss hornetq SVN: r11742 - branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/impl/netty.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-22 16:35:33 -0500 (Tue, 22 Nov 2011)
New Revision: 11742
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
Log:
avoiding dead locks
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2011-11-22 19:31:59 UTC (rev 11741)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java 2011-11-22 21:35:33 UTC (rev 11742)
@@ -96,19 +96,20 @@
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) throws Exception
{
- synchronized (this)
+ if (!active)
{
- if (!active)
- {
- return;
- }
+ return;
+ }
- // We don't want to log this - since it is normal for this to happen during failover/reconnect
- // and we don't want to spew out stack traces in that event
- // The user has access to this exeception anyway via the HornetQException initial cause
+ // We don't want to log this - since it is normal for this to happen during failover/reconnect
+ // and we don't want to spew out stack traces in that event
+ // The user has access to this exeception anyway via the HornetQException initial cause
- HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR, "Netty exception");
- me.initCause(e.getCause());
+ HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR, "Netty exception");
+ me.initCause(e.getCause());
+
+ synchronized (listener)
+ {
try
{
listener.connectionException(e.getChannel().getId(), me);
13 years, 1 month
JBoss hornetq SVN: r11741 - branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-22 14:31:59 -0500 (Tue, 22 Nov 2011)
New Revision: 11741
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
avoiding dead locks through stomp / close session
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-11-22 17:23:39 UTC (rev 11740)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-11-22 19:31:59 UTC (rev 11741)
@@ -17,6 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -66,6 +67,8 @@
private final Object failLock = new Object();
+ private final Executor executor;
+
private volatile boolean dataReceived;
public StompDecoder getDecoder()
@@ -73,7 +76,7 @@
return decoder;
}
- StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager)
+ StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager, final Executor executor)
{
this.transportConnection = transportConnection;
@@ -82,6 +85,8 @@
this.creationTime = System.currentTimeMillis();
this.acceptorUsed = acceptorUsed;
+
+ this.executor = executor;
}
public void addFailureListener(final FailureListener listener)
@@ -322,7 +327,6 @@
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
-
for (final FailureListener listener : listenersClone)
{
try
@@ -343,20 +347,26 @@
{
final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
- for (final CloseListener listener : listenersClone)
- {
- try
+ // avoiding a dead lock
+ executor.execute(new Runnable(){
+ public void run()
{
- listener.connectionClosed();
+ for (final CloseListener listener : listenersClone)
+ {
+ try
+ {
+ listener.connectionClosed();
+ }
+ catch (final Throwable t)
+ {
+ // Failure of one listener to execute shouldn't prevent others
+ // from
+ // executing
+ log.error("Failed to execute failure listener", t);
+ }
+ }
}
- catch (final Throwable t)
- {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- log.error("Failed to execute failure listener", t);
- }
- }
+ });
}
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-22 17:23:39 UTC (rev 11740)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-22 19:31:59 UTC (rev 11741)
@@ -120,7 +120,7 @@
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
{
- StompConnection conn = new StompConnection(acceptorUsed, connection, this);
+ StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getExecutorFactory().getExecutor());
// Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!
13 years, 1 month
JBoss hornetq SVN: r11740 - in branches/HORNETQ-316: hornetq-core/src/main/java/org/hornetq/api/core and 18 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-11-22 12:23:39 -0500 (Tue, 22 Nov 2011)
New Revision: 11740
Added:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnector.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/UDPDiscoveryClusterConnectorImpl.java
branches/HORNETQ-316/hornetq-jgroups-discovery/
branches/HORNETQ-316/hornetq-jgroups-discovery/pom.xml
branches/HORNETQ-316/hornetq-jgroups-discovery/src/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryClusterConnectorImpl.java
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
Removed:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConfiguration.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/BroadcastGroup.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd
branches/HORNETQ-316/pom.xml
Log:
https://issues.jboss.org/browse/HORNETQ-316 implement pluggable cluster discovery
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -14,15 +14,13 @@
package org.hornetq.api.core;
import java.io.Serializable;
+import java.util.Map;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.utils.UUIDGenerator;
-
/**
* A DiscoveryGroupConfiguration
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 18 Nov 2008 08:47:30
*
*
@@ -30,121 +28,61 @@
public class DiscoveryGroupConfiguration implements Serializable
{
private static final long serialVersionUID = 8657206421727863400L;
-
- private String name;
-
- private String localBindAddress;
- private String groupAddress;
+ private String serverLocatorClassName;
- private int groupPort;
+ private String clusterConnectorClassName;
- private long refreshTimeout;
-
- private long discoveryInitialWaitTimeout;
+ private String name;
- public DiscoveryGroupConfiguration(final String name,
- final String localBindAddress,
- final String groupAddress,
- final int groupPort,
- final long refreshTimeout,
- final long discoveryInitialWaitTimeout)
+ private final Map<String, Object> params;
+
+ public DiscoveryGroupConfiguration(final String serverLocatorClassName,
+ final String clusterConnectorClassName,
+ final Map<String, Object> params,
+ final String name)
{
+ this.serverLocatorClassName = serverLocatorClassName;
+ this.clusterConnectorClassName = clusterConnectorClassName;
this.name = name;
- this.groupAddress = groupAddress;
- this.localBindAddress = localBindAddress;
- this.groupPort = groupPort;
- this.refreshTimeout = refreshTimeout;
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
+ this.params = params;
}
- public DiscoveryGroupConfiguration(final String groupAddress,
- final int groupPort)
- {
- this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAddress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
- }
-
public String getName()
{
return name;
}
-
- public String getLocalBindAddress()
- {
- return localBindAddress;
- }
- public String getGroupAddress()
+ public String getServerLocatorClassName()
{
- return groupAddress;
+ return serverLocatorClassName;
}
- public int getGroupPort()
+ public String getClusterConnectorClassName()
{
- return groupPort;
+ return clusterConnectorClassName;
}
- public long getRefreshTimeout()
+ public Map<String, Object> getParams()
{
- return refreshTimeout;
+ return params;
}
- /**
- * @param name the name to set
- */
public void setName(final String name)
{
this.name = name;
}
-
- /**
- * @param localBindAddress the localBindAddress to set
- */
- public void setLocalBindAdress(final String localBindAddress)
- {
- this.localBindAddress = localBindAddress;
- }
- /**
- * @param groupAddress the groupAddress to set
- */
- public void setGroupAddress(final String groupAddress)
+ public void setServerLocatorClassName(String name)
{
- this.groupAddress = groupAddress;
+ this.serverLocatorClassName = name;
}
- /**
- * @param groupPort the groupPort to set
- */
- public void setGroupPort(final int groupPort)
+ public void setClusterConnectorClassName(String name)
{
- this.groupPort = groupPort;
+ this.clusterConnectorClassName = name;
}
- /**
- * @param refreshTimeout the refreshTimeout to set
- */
- public void setRefreshTimeout(final long refreshTimeout)
- {
- this.refreshTimeout = refreshTimeout;
- }
-
- /**
- * @return the discoveryInitialWaitTimeout
- */
- public long getDiscoveryInitialWaitTimeout()
- {
- return discoveryInitialWaitTimeout;
- }
-
- /**
- * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout to set
- */
- public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
- {
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
- }
-
@Override
public boolean equals(Object o)
{
@@ -153,14 +91,21 @@
DiscoveryGroupConfiguration that = (DiscoveryGroupConfiguration) o;
- if (discoveryInitialWaitTimeout != that.discoveryInitialWaitTimeout) return false;
- if (groupPort != that.groupPort) return false;
- if (refreshTimeout != that.refreshTimeout) return false;
- if (groupAddress != null ? !groupAddress.equals(that.groupAddress) : that.groupAddress != null) return false;
- if (localBindAddress != null ? !localBindAddress.equals(that.localBindAddress) : that.localBindAddress != null)
- return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (serverLocatorClassName != null ? !serverLocatorClassName.equals(that.serverLocatorClassName)
+ : that.serverLocatorClassName != null)
+ return false;
+ if (params == null && that.params != null)
+ return false;
+ if (params.keySet().size() != that.params.keySet().size())
+ return false;
+ for (String key : params.keySet())
+ {
+ if (!params.get(key).equals(that.params.get(key)))
+ return false;
+ }
+
return true;
}
@@ -168,11 +113,10 @@
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (localBindAddress != null ? localBindAddress.hashCode() : 0);
- result = 31 * result + (groupAddress != null ? groupAddress.hashCode() : 0);
- result = 31 * result + groupPort;
- result = 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>> 32));
- result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32));
+ for (String key : params.keySet())
+ {
+ result = 31 * result + (params.get(key) != null ? params.get(key).hashCode() : 0);
+ }
return result;
}
@@ -182,19 +126,17 @@
@Override
public String toString()
{
- return "DiscoveryGroupConfiguration [discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout +
- ", groupAddress=" +
- groupAddress +
- ", groupPort=" +
- groupPort +
- ", localBindAddress=" +
- localBindAddress +
- ", name=" +
- name +
- ", refreshTimeout=" +
- refreshTimeout +
- "]";
+ StringBuilder str =
+ new StringBuilder().append("DiscoveryGroupConfiguration [serverLocatorClassName=")
+ .append(serverLocatorClassName)
+ .append(", name=")
+ .append(name);
+ for (String key : params.keySet())
+ {
+ str.append(", ").append(key).append("=").append(params.get(key));
+ }
+ return str.append("]").toString();
}
-
-
+
+
}
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/DiscoveryGroupConstants.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>"
+ *
+ */
+public class DiscoveryGroupConstants
+{
+ // for static connector
+ public static final String STATIC_CONNECTOR_NAMES_NAME = "static-connector-names";
+ public static final String STATIC_CONNECTOR_CONFIG_LIST_NAME = "static-connector-list";
+
+ // for UDP discovery
+ public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+ public static final String GROUP_ADDRESS_NAME = "group-address";
+ public static final String GROUP_PORT_NAME = "group-port";
+ public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+ public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -12,22 +12,31 @@
*/
package org.hornetq.api.core.client;
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.UUIDGenerator;
/**
* Utility class for creating HornetQ {@link ClientSessionFactory} objects.
- *
+ *
* Once a {@link ClientSessionFactory} has been created, it can be further configured
* using its setter methods before creating the sessions. Once a session is created,
* the factory can no longer be modified (its setter methods will throw a {@link IllegalStateException}.
- *
+ *
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class HornetQClient
{
+ private static final Logger log = Logger.getLogger(HornetQClient.class);
+
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = RoundRobinConnectionLoadBalancingPolicy.class.getCanonicalName();
public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 30000;
@@ -37,9 +46,9 @@
public static final long DEFAULT_CONNECTION_TTL = 1 * 60 * 1000;
// Any message beyond this size is considered a large message (to be sent in chunks)
-
+
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
-
+
public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
@@ -81,7 +90,7 @@
public static final int DEFAULT_RECONNECT_ATTEMPTS = 0;
public static final int INITIAL_CONNECT_ATTEMPTS = 1;
-
+
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;
public static final boolean DEFAULT_IS_HA = false;
@@ -97,35 +106,57 @@
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
public static final boolean DEFAULT_XA = false;
-
+
public static final boolean DEFAULT_HA = false;
-
+
/**
* Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
* as the cluster topology changes, and no HA backup information is propagated to the client
- *
+ *
* @param transportConfigurations
* @return the ServerLocator
*/
public static ServerLocator createServerLocatorWithoutHA(TransportConfiguration... transportConfigurations)
{
- return new ServerLocatorImpl(false, transportConfigurations);
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME, Arrays.asList(transportConfigurations));
+ DiscoveryGroupConfiguration config =
+ new DiscoveryGroupConfiguration("org.hornetq.core.client.impl.StaticServerLocatorImpl",
+ "org.hornetq.core.server.cluster.impl.StaticClusterConnectorImpl",
+ params,
+ UUIDGenerator.getInstance().generateStringUUID());
+ return createServerLocatorWithoutHA(config);
}
-
+
/**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
- *
+ *
* The UDP address and port are used to listen for live servers in the cluster
- *
+ *
* @param discoveryAddress The UDP group address to listen for updates
* @param discoveryPort the UDP port to listen for updates
* @return the ServerLocator
*/
public static ServerLocator createServerLocatorWithoutHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(false, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.FALSE, groupConfiguration);
+ }
+ catch (Exception e)
+ {
+ log.fatal("Could not instantiate ServerLocator implementation class: ", e);
+ return null;
+ }
+
+ return serverLocator;
}
-
+
/**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The initial list of servers supplied in this method is simply to make an initial connection to the cluster, once that connection is made, up to date
@@ -137,9 +168,16 @@
*/
public static ServerLocator createServerLocatorWithHA(TransportConfiguration... initialServers)
{
- return new ServerLocatorImpl(true, initialServers);
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME, Arrays.asList(initialServers));
+ DiscoveryGroupConfiguration config =
+ new DiscoveryGroupConfiguration("org.hornetq.core.client.impl.StaticServerLocatorImpl",
+ "org.hornetq.core.server.cluster.impl.StaticClusterConnectorImpl",
+ params,
+ UUIDGenerator.getInstance().generateStringUUID());
+ return createServerLocatorWithHA(config);
}
-
+
/**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The discoveryAddress and discoveryPort parameters in this method are used to listen for UDP broadcasts which contain connection information for members of the cluster.
@@ -152,10 +190,26 @@
*/
public static ServerLocator createServerLocatorWithHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(true, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.TRUE, groupConfiguration);
+ }
+ catch (Exception e)
+ {
+ log.fatal("Could not instantiate ServerLocator implementation class", e);
+ return null;
+ }
+
+ return serverLocator;
}
-
+
private HornetQClient()
{
}
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/AbstractServerLocator.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,1505 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A Abstract ServerLocatorImpl
+ * @author Tim Fox
+ */
+public abstract class AbstractServerLocator implements ServerLocatorInternal, DiscoveryListener, Serializable
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ protected enum STATE
+ {
+ INITIALIZED, CLOSING, CLOSED,
+ };
+
+ private static final Logger log = Logger.getLogger(AbstractServerLocator.class);
+
+ private final boolean ha;
+
+ private boolean finalizeCheck = true;
+
+ private boolean clusterConnection;
+
+ private transient String identity;
+
+ private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+ private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
+
+ private TransportConfiguration[] initialConnectors;
+
+ private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
+ private final Topology topology;
+
+ private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
+
+ private boolean receivedTopology;
+
+ private boolean compressLargeMessage;
+
+ // if the system should shutdown the pool when shutting down
+ private transient boolean shutdownPool;
+
+ private ExecutorService threadPool;
+
+ private ScheduledExecutorService scheduledThreadPool;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+ // Settable attributes:
+
+ private boolean cacheLargeMessagesClient;
+
+ private long clientFailureCheckPeriod;
+
+ private long connectionTTL;
+
+ private long callTimeout;
+
+ private int minLargeMessageSize;
+
+ private int consumerWindowSize;
+
+ private int consumerMaxRate;
+
+ private int confirmationWindowSize;
+
+ private int producerWindowSize;
+
+ private int producerMaxRate;
+
+ private boolean blockOnAcknowledge;
+
+ private boolean blockOnDurableSend;
+
+ private boolean blockOnNonDurableSend;
+
+ private boolean autoGroup;
+
+ private boolean preAcknowledge;
+
+ private String connectionLoadBalancingPolicyClassName;
+
+ private int ackBatchSize;
+
+ private boolean useGlobalPools;
+
+ private int scheduledThreadPoolMaxSize;
+
+ private int threadPoolMaxSize;
+
+ private long retryInterval;
+
+ private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
+
+ private int reconnectAttempts;
+
+ private int initialConnectAttempts;
+
+ private boolean failoverOnInitialConnection;
+
+ private int initialMessagePacketSize;
+
+ private volatile STATE state;
+
+ private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+ private static ExecutorService globalThreadPool;
+
+ private Executor startExecutor;
+
+ private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private AfterConnectInternalListener afterConnectListener;
+
+ private String groupID;
+
+ private String nodeID;
+
+ private TransportConfiguration clusterTransportConfiguration;
+
+ private boolean backup;
+
+ private final Exception e = new Exception();
+
+ public static synchronized void clearThreadPools()
+ {
+
+ if (globalThreadPool != null)
+ {
+ globalThreadPool.shutdown();
+ try
+ {
+ if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Couldn't finish the globalThreadPool");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ finally
+ {
+ globalThreadPool = null;
+ }
+ }
+
+ if (globalScheduledThreadPool != null)
+ {
+ globalScheduledThreadPool.shutdown();
+ try
+ {
+ if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ finally
+ {
+ globalScheduledThreadPool = null;
+ }
+ }
+ }
+
+ protected static synchronized ExecutorService getGlobalThreadPool()
+ {
+ if (globalThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
+
+ globalThreadPool = Executors.newCachedThreadPool(factory);
+ }
+
+ return globalThreadPool;
+ }
+
+ public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ {
+ if (globalScheduledThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+ true,
+ getThisClassLoader());
+
+ globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+ factory);
+ }
+
+ return globalScheduledThreadPool;
+ }
+
+ protected void setThreadPools()
+ {
+ if (threadPool != null)
+ {
+ return;
+ }
+ else if (useGlobalPools)
+ {
+ threadPool = getGlobalThreadPool();
+
+ scheduledThreadPool = getGlobalScheduledThreadPool();
+ }
+ else
+ {
+ this.shutdownPool = true;
+
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ if (threadPoolMaxSize == -1)
+ {
+ threadPool = Executors.newCachedThreadPool(factory);
+ }
+ else
+ {
+ threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+ }
+
+ factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+ }
+ }
+
+ protected static ClassLoader getThisClassLoader()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
+ }
+
+ protected void instantiateLoadBalancingPolicy()
+ {
+ if (connectionLoadBalancingPolicyClassName == null)
+ {
+ throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+ }
+
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ return null;
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+ "\"",
+ e);
+ }
+ }
+ });
+ }
+
+ protected synchronized void initialise() throws HornetQException
+ {
+ if (state == STATE.INITIALIZED)
+ {
+ return;
+ }
+ if (state == STATE.CLOSING)
+ {
+ throw new IllegalStateException("Cannot initialize 'closing' locator");
+ }
+ try
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ initialiseInternal();
+
+ state = STATE.INITIALIZED;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+ }
+
+ protected abstract void initialiseInternal() throws Exception;
+
+ protected AbstractServerLocator(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration,
+ final TransportConfiguration[] transportConfigs)
+ {
+ e.fillInStackTrace();
+
+ this.topology = topology;
+
+ this.ha = useHA;
+
+ this.discoveryGroupConfiguration = discoveryGroupConfiguration;
+
+ this.initialConnectors = transportConfigs;
+
+ this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+
+ clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+ minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+ consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+ confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+ producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+ producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+ blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+ blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+ blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+ autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+ preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+ ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+ connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+ useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+ scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+ threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+ retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+ retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+ reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+ initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
+
+ failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
+
+ clusterConnection = false;
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public AbstractServerLocator(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ topology.setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public AbstractServerLocator(final boolean useHA, final TransportConfiguration... transportConfigs)
+ {
+ this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ topology.setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public AbstractServerLocator(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(topology, useHA, groupConfiguration, null);
+
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public AbstractServerLocator(final Topology topology,
+ final boolean useHA,
+ final TransportConfiguration... transportConfigs)
+ {
+ this(topology, useHA, null, transportConfigs);
+ }
+
+ private TransportConfiguration selectConnector()
+ {
+ if (receivedTopology)
+ {
+ int pos = loadBalancingPolicy.select(topologyArray.length);
+
+ Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
+
+ return pair.getA();
+ }
+ else
+ {
+ // Get from initialconnectors
+
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
+
+ return initialConnectors[pos];
+ }
+ }
+
+ public void start(Executor executor) throws Exception
+ {
+ initialise();
+
+ this.startExecutor = executor;
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (isInitialized())
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ public Executor getExecutor()
+ {
+ return startExecutor;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
+ */
+ public void disableFinalizeCheck()
+ {
+ finalizeCheck = false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
+ */
+ public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
+ {
+ this.afterConnectListener = listener;
+ }
+
+ public AfterConnectInternalListener getAfterConnectInternalListener()
+ {
+ return afterConnectListener;
+ }
+
+ public boolean isClosed()
+ {
+ return state == STATE.CLOSED || state == STATE.CLOSING;
+ }
+
+ public boolean isInitialized()
+ {
+ return state == STATE.INITIALIZED;
+ }
+
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+ {
+ assertOpen();
+
+ initialise();
+
+ synchronized (this)
+ {
+ assertOpen();
+ ClientSessionFactoryInternal factory =
+ new ClientSessionFactoryImpl(this, transportConfiguration,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ addFactory(factory);
+ return factory;
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
+ }
+ }
+
+ protected void removeFromConnecting(ClientSessionFactoryInternal factory)
+ {
+ connectingFactories.remove(factory);
+ }
+
+ protected void addToConnecting(ClientSessionFactoryInternal factory)
+ {
+ synchronized (connectingFactories)
+ {
+ assertOpen();
+ connectingFactories.add(factory);
+ }
+ }
+
+ protected void assertOpen()
+ {
+ if (state != null && state != STATE.INITIALIZED)
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+ }
+
+ @Override
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ assertOpen();
+
+ initialise();
+
+ waitInitialDiscovery();
+
+ ClientSessionFactoryInternal factory = null;
+
+ synchronized (this)
+ {
+ assertOpen();
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ TransportConfiguration tc = selectConnector();
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ assertOpen();
+
+ factory =
+ new ClientSessionFactoryImpl(this, tc, callTimeout, clientFailureCheckPeriod, connectionTTL,
+ retryInterval, retryIntervalMultiplier, maxRetryInterval,
+ reconnectAttempts, threadPool, scheduledThreadPool, interceptors);
+ addToConnecting(factory);
+ try
+ {
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ }
+ finally
+ {
+ removeFromConnecting(factory);
+ }
+ }
+ catch (HornetQException e)
+ {
+ if (factory != null)
+ {
+ factory.close();
+ }
+ factory = null;
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (topologyArray != null && attempts == topologyArray.length)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (ha || clusterConnection)
+ {
+ long timeout = System.currentTimeMillis() + 30000;
+ while (isInitialized() && !receivedTopology && timeout > System.currentTimeMillis())
+ {
+ // Now wait for the topology
+
+ try
+ {
+ wait(1000);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (System.currentTimeMillis() > timeout && !receivedTopology && isInitialized())
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
+ }
+ }
+
+ addFactory(factory);
+
+ return factory;
+ }
+ }
+
+ protected abstract void waitInitialDiscovery() throws Exception;
+
+ public boolean isHA()
+ {
+ return ha;
+ }
+
+ public boolean isCacheLargeMessagesClient()
+ {
+ return cacheLargeMessagesClient;
+ }
+
+ public void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
+ public long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+ {
+ checkWrite();
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ }
+
+ public long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ public void setConnectionTTL(final long connectionTTL)
+ {
+ checkWrite();
+ this.connectionTTL = connectionTTL;
+ }
+
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ public void setCallTimeout(final long callTimeout)
+ {
+ checkWrite();
+ this.callTimeout = callTimeout;
+ }
+
+ public int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ public void setMinLargeMessageSize(final int minLargeMessageSize)
+ {
+ checkWrite();
+ this.minLargeMessageSize = minLargeMessageSize;
+ }
+
+ public int getConsumerWindowSize()
+ {
+ return consumerWindowSize;
+ }
+
+ public void setConsumerWindowSize(final int consumerWindowSize)
+ {
+ checkWrite();
+ this.consumerWindowSize = consumerWindowSize;
+ }
+
+ public int getConsumerMaxRate()
+ {
+ return consumerMaxRate;
+ }
+
+ public void setConsumerMaxRate(final int consumerMaxRate)
+ {
+ checkWrite();
+ this.consumerMaxRate = consumerMaxRate;
+ }
+
+ public int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
+ public void setConfirmationWindowSize(final int confirmationWindowSize)
+ {
+ checkWrite();
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
+
+ public int getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+
+ public void setProducerWindowSize(final int producerWindowSize)
+ {
+ checkWrite();
+ this.producerWindowSize = producerWindowSize;
+ }
+
+ public int getProducerMaxRate()
+ {
+ return producerMaxRate;
+ }
+
+ public void setProducerMaxRate(final int producerMaxRate)
+ {
+ checkWrite();
+ this.producerMaxRate = producerMaxRate;
+ }
+
+ public boolean isBlockOnAcknowledge()
+ {
+ return blockOnAcknowledge;
+ }
+
+ public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ {
+ checkWrite();
+ this.blockOnAcknowledge = blockOnAcknowledge;
+ }
+
+ public boolean isBlockOnDurableSend()
+ {
+ return blockOnDurableSend;
+ }
+
+ public void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ {
+ checkWrite();
+ this.blockOnDurableSend = blockOnDurableSend;
+ }
+
+ public boolean isBlockOnNonDurableSend()
+ {
+ return blockOnNonDurableSend;
+ }
+
+ public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+ {
+ checkWrite();
+ this.blockOnNonDurableSend = blockOnNonDurableSend;
+ }
+
+ public boolean isAutoGroup()
+ {
+ return autoGroup;
+ }
+
+ public void setAutoGroup(final boolean autoGroup)
+ {
+ checkWrite();
+ this.autoGroup = autoGroup;
+ }
+
+ public boolean isPreAcknowledge()
+ {
+ return preAcknowledge;
+ }
+
+ public void setPreAcknowledge(final boolean preAcknowledge)
+ {
+ checkWrite();
+ this.preAcknowledge = preAcknowledge;
+ }
+
+ public int getAckBatchSize()
+ {
+ return ackBatchSize;
+ }
+
+ public void setAckBatchSize(final int ackBatchSize)
+ {
+ checkWrite();
+ this.ackBatchSize = ackBatchSize;
+ }
+
+ public boolean isUseGlobalPools()
+ {
+ return useGlobalPools;
+ }
+
+ public void setUseGlobalPools(final boolean useGlobalPools)
+ {
+ checkWrite();
+ this.useGlobalPools = useGlobalPools;
+ }
+
+ public int getScheduledThreadPoolMaxSize()
+ {
+ return scheduledThreadPoolMaxSize;
+ }
+
+ public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+ {
+ checkWrite();
+ this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+ }
+
+ public int getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
+
+ public void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ {
+ checkWrite();
+ this.threadPoolMaxSize = threadPoolMaxSize;
+ }
+
+ public long getRetryInterval()
+ {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ this.retryInterval = retryInterval;
+ }
+
+ public long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ public void setMaxRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ maxRetryInterval = retryInterval;
+ }
+
+ public double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+ {
+ checkWrite();
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ }
+
+ public int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
+ public void setReconnectAttempts(final int reconnectAttempts)
+ {
+ checkWrite();
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ public void setInitialConnectAttempts(int initialConnectAttempts)
+ {
+ checkWrite();
+ this.initialConnectAttempts = initialConnectAttempts;
+ }
+
+ public int getInitialConnectAttempts()
+ {
+ return initialConnectAttempts;
+ }
+
+ public boolean isFailoverOnInitialConnection()
+ {
+ return this.failoverOnInitialConnection;
+ }
+
+ public void setFailoverOnInitialConnection(final boolean failover)
+ {
+ checkWrite();
+ this.failoverOnInitialConnection = failover;
+ }
+
+ public String getConnectionLoadBalancingPolicyClassName()
+ {
+ return connectionLoadBalancingPolicyClassName;
+ }
+
+ public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+ {
+ checkWrite();
+ connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+ }
+
+ public TransportConfiguration[] getStaticTransportConfigurations()
+ {
+ return this.initialConnectors;
+ }
+
+ protected void setStaticTransportConfigurations(TransportConfiguration[] connectors)
+ {
+ initialConnectors = connectors;
+ }
+
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+ {
+ return discoveryGroupConfiguration;
+ }
+
+ public void addInterceptor(final Interceptor interceptor)
+ {
+ interceptors.add(interceptor);
+ }
+
+ public boolean removeInterceptor(final Interceptor interceptor)
+ {
+ return interceptors.remove(interceptor);
+ }
+
+ public int getInitialMessagePacketSize()
+ {
+ return initialMessagePacketSize;
+ }
+
+ public void setInitialMessagePacketSize(final int size)
+ {
+ checkWrite();
+ initialMessagePacketSize = size;
+ }
+
+ public void setGroupID(final String groupID)
+ {
+ checkWrite();
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
+ */
+ public boolean isCompressLargeMessage()
+ {
+ return compressLargeMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
+ */
+ public void setCompressLargeMessage(boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
+ private void checkWrite()
+ {
+ if (state == STATE.INITIALIZED)
+ {
+ throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+ }
+ }
+
+ public String getIdentity()
+ {
+ return identity;
+ }
+
+ public void setIdentity(String identity)
+ {
+ this.identity = identity;
+ }
+
+ public void setNodeID(String nodeID)
+ {
+ this.nodeID = nodeID;
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public void setClusterConnection(boolean clusterConnection)
+ {
+ this.clusterConnection = clusterConnection;
+ }
+
+ public boolean isClusterConnection()
+ {
+ return clusterConnection;
+ }
+
+ public TransportConfiguration getClusterTransportConfiguration()
+ {
+ return clusterTransportConfiguration;
+ }
+
+ public void setClusterTransportConfiguration(TransportConfiguration tc)
+ {
+ this.clusterTransportConfiguration = tc;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public void setBackup(boolean backup)
+ {
+ this.backup = backup;
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ if (finalizeCheck)
+ {
+ close();
+ }
+
+ super.finalize();
+ }
+
+ public void cleanup()
+ {
+ doClose(false);
+ }
+
+ public void close()
+ {
+ doClose(true);
+ }
+
+ private void doClose(final boolean sendClose)
+ {
+ if (state == STATE.CLOSED)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is already closed when calling closed");
+ }
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is calling close", new Exception("trace"));
+ }
+
+ state = STATE.CLOSING;
+
+ doCloseInternal();
+
+ synchronized (connectingFactories)
+ {
+ for (ClientSessionFactoryInternal factory : connectingFactories)
+ {
+ factory.causeExit();
+ factory.close();
+ }
+ connectingFactories.clear();
+ }
+
+ synchronized (factories)
+ {
+ Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
+
+ for (ClientSessionFactory factory : clonedFactory)
+ {
+ if (sendClose)
+ {
+ factory.close();
+ }
+ else
+ {
+ factory.cleanup();
+ }
+ }
+
+ factories.clear();
+ }
+
+ if (shutdownPool)
+ {
+ if (threadPool != null)
+ {
+ threadPool.shutdown();
+
+ try
+ {
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (scheduledThreadPool != null)
+ {
+ scheduledThreadPool.shutdown();
+
+ try
+ {
+ if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for scheduled pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ state = STATE.CLOSED;
+ }
+
+ protected abstract void doCloseInternal();
+
+ /** This is directly called when the connection to the node is gone,
+ * or when the node sends a disconnection.
+ * Look for callers of this method! */
+ public void notifyNodeDown(final long eventTime, final String nodeID)
+ {
+
+ if (topology == null)
+ {
+ // there's no topology here
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
+ }
+
+ if (topology.removeMember(eventTime, nodeID))
+ {
+ if (topology.isEmpty())
+ {
+ // Resetting the topology to its original condition as it was brand new
+ synchronized (this)
+ {
+ topologyArray = null;
+ receivedTopology = false;
+ }
+ }
+ else
+ {
+ updateArraysAndPairs();
+
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+ {
+ // Resetting the topology to its original condition as it was brand new
+ receivedTopology = false;
+ }
+ }
+ }
+
+ }
+
+ public void notifyNodeUp(long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last)
+ {
+ if (topology == null)
+ {
+ // there's no topology
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
+ }
+
+ TopologyMember member = new TopologyMember(connectorPair.getA(), connectorPair.getB());
+
+ if (topology.updateMember(uniqueEventID, nodeID, member))
+ {
+
+ TopologyMember actMember = topology.getMember(nodeID);
+
+ if (actMember != null && actMember.getConnector().getA() != null && actMember.getConnector().getB() != null)
+ {
+ for (ClientSessionFactory factory : factories)
+ {
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().getA(),
+ actMember.getConnector().getB());
+ }
+ }
+
+ updateArraysAndPairs();
+ }
+
+ if (last)
+ {
+ synchronized (this)
+ {
+ receivedTopology = true;
+ // Notify if waiting on getting topology
+ notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ if (identity != null)
+ {
+ return this.getClass().getName() + " (identity=" + identity +
+ ") [initialConnectors=" +
+ Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
+ return this.getClass().getName() + " [initialConnectors=" + Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
+
+ private synchronized void updateArraysAndPairs()
+ {
+ Collection<TopologyMember> membersCopy = topology.getMembers();
+
+ topologyArray =
+ (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class, membersCopy.size());
+
+ int count = 0;
+ for (TopologyMember pair : membersCopy)
+ {
+ topologyArray[count++] = pair.getConnector();
+ }
+ }
+
+ public synchronized void connectorsChanged()
+ {
+ List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+ this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ newConnectors.size());
+
+ int count = 0;
+ for (DiscoveryEntry entry : newConnectors)
+ {
+ this.initialConnectors[count++] = entry.getConnector();
+
+ if (topology != null && topology.getMember(entry.getNodeID()) == null)
+ {
+ TopologyMember member = new TopologyMember(entry.getConnector(), null);
+ // on this case we set it as zero as any update coming from server should be accepted
+ topology.updateMember(0, entry.getNodeID(), member);
+ }
+ }
+
+ if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
+ {
+ // FIXME the node is alone in the cluster. We create a connection to the new node
+ // to trigger the node notification to form the cluster.
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ public synchronized void factoryClosed(final ClientSessionFactory factory)
+ {
+ factories.remove(factory);
+
+ if (!clusterConnection && factories.isEmpty())
+ {
+ // Go back to using the broadcast or static list
+
+ receivedTopology = false;
+
+ topologyArray = null;
+ }
+ }
+
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topology.addClusterTopologyListener(listener);
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topology.removeClusterTopologyListener(listener);
+ }
+
+ protected synchronized void addFactory(ClientSessionFactoryInternal factory)
+ {
+ if (factory == null)
+ {
+ return;
+ }
+
+ synchronized (factories)
+ {
+ if (isClosed())
+ {
+ factory.close();
+ return;
+ }
+
+ TransportConfiguration backup = null;
+
+ if (topology != null)
+ {
+ backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ }
+
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factories.add(factory);
+ }
+ }
+
+ protected STATE getState()
+ {
+ return state;
+ }
+
+ protected void setState(STATE s)
+ {
+ this.state = s;
+ }
+
+ protected synchronized ExecutorService getThreadPool()
+ {
+ return threadPool;
+ }
+
+ protected synchronized ScheduledExecutorService getScheduledThreadPool()
+ {
+ return scheduledThreadPool;
+ }
+
+ protected synchronized List<Interceptor> getInterceptors()
+ {
+ return interceptors;
+ }
+
+ protected synchronized boolean isFinalizeCheckEnabled()
+ {
+ return finalizeCheck;
+ }
+
+ protected synchronized boolean receivedTopology()
+ {
+ return receivedTopology;
+ }
+
+ protected synchronized ConnectionLoadBalancingPolicy getLoadBalancingPolicy()
+ {
+ return loadBalancingPolicy;
+ }
+
+ protected synchronized Pair<TransportConfiguration, TransportConfiguration>[] getTopologyArray()
+ {
+ return topologyArray;
+ }
+}
Deleted: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -1,1753 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * A ServerLocatorImpl
- *
- * @author Tim Fox
- */
-public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
-{
- private static final long serialVersionUID = -1615857864410205260L;
-
- private enum STATE
- {
- INITIALIZED, CLOSING, CLOSED,
- };
-
- private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
-
- private final boolean ha;
-
- private boolean finalizeCheck = true;
-
- private boolean clusterConnection;
-
- private transient String identity;
-
- private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
- private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
-
- private TransportConfiguration[] initialConnectors;
-
- private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
-
- private final StaticConnector staticConnector = new StaticConnector();
-
- private final Topology topology;
-
- private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
-
- private boolean receivedTopology;
-
- private boolean compressLargeMessage;
-
- // if the system should shutdown the pool when shutting down
- private transient boolean shutdownPool;
-
- private ExecutorService threadPool;
-
- private ScheduledExecutorService scheduledThreadPool;
-
- private DiscoveryGroup discoveryGroup;
-
- private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
- // Settable attributes:
-
- private boolean cacheLargeMessagesClient;
-
- private long clientFailureCheckPeriod;
-
- private long connectionTTL;
-
- private long callTimeout;
-
- private int minLargeMessageSize;
-
- private int consumerWindowSize;
-
- private int consumerMaxRate;
-
- private int confirmationWindowSize;
-
- private int producerWindowSize;
-
- private int producerMaxRate;
-
- private boolean blockOnAcknowledge;
-
- private boolean blockOnDurableSend;
-
- private boolean blockOnNonDurableSend;
-
- private boolean autoGroup;
-
- private boolean preAcknowledge;
-
- private String connectionLoadBalancingPolicyClassName;
-
- private int ackBatchSize;
-
- private boolean useGlobalPools;
-
- private int scheduledThreadPoolMaxSize;
-
- private int threadPoolMaxSize;
-
- private long retryInterval;
-
- private double retryIntervalMultiplier;
-
- private long maxRetryInterval;
-
- private int reconnectAttempts;
-
- private int initialConnectAttempts;
-
- private boolean failoverOnInitialConnection;
-
- private int initialMessagePacketSize;
-
- private volatile STATE state;
-
- private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
-
- private static ExecutorService globalThreadPool;
-
- private Executor startExecutor;
-
- private static ScheduledExecutorService globalScheduledThreadPool;
-
- private AfterConnectInternalListener afterConnectListener;
-
- private String groupID;
-
- private String nodeID;
-
- private TransportConfiguration clusterTransportConfiguration;
-
- private boolean backup;
-
- private final Exception e = new Exception();
-
- // To be called when there are ServerLocator being finalized.
- // To be used on test assertions
- public static Runnable finalizeCallback = null;
-
- public static synchronized void clearThreadPools()
- {
-
- if (globalThreadPool != null)
- {
- globalThreadPool.shutdown();
- try
- {
- if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS))
- {
- throw new IllegalStateException("Couldn't finish the globalThreadPool");
- }
- }
- catch (InterruptedException e)
- {
- }
- finally
- {
- globalThreadPool = null;
- }
- }
-
- if (globalScheduledThreadPool != null)
- {
- globalScheduledThreadPool.shutdown();
- try
- {
- if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS))
- {
- throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool");
- }
- }
- catch (InterruptedException e)
- {
- }
- finally
- {
- globalScheduledThreadPool = null;
- }
- }
- }
-
- private static synchronized ExecutorService getGlobalThreadPool()
- {
- if (globalThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
-
- globalThreadPool = Executors.newCachedThreadPool(factory);
- }
-
- return globalThreadPool;
- }
-
- public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
- {
- if (globalScheduledThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
-
- globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
- factory);
- }
-
- return globalScheduledThreadPool;
- }
-
- private void setThreadPools()
- {
- if (threadPool != null)
- {
- return;
- }
- else if (useGlobalPools)
- {
- threadPool = getGlobalThreadPool();
-
- scheduledThreadPool = getGlobalScheduledThreadPool();
- }
- else
- {
- this.shutdownPool = true;
-
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- if (threadPoolMaxSize == -1)
- {
- threadPool = Executors.newCachedThreadPool(factory);
- }
- else
- {
- threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
- }
-
- factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
- }
- }
-
- private static ClassLoader getThisClassLoader()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
- }
-
- private void instantiateLoadBalancingPolicy()
- {
- if (connectionLoadBalancingPolicyClassName == null)
- {
- throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
- }
-
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- public Object run()
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
- return null;
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
- }
- }
- });
- }
-
- private synchronized void initialise() throws HornetQException
- {
- if (state == STATE.INITIALIZED)
- {
- return;
- }
- if (state == STATE.CLOSING)
- {
- throw new IllegalStateException("Cannot initialize 'closing' locator");
- }
- try
- {
- setThreadPools();
-
- instantiateLoadBalancingPolicy();
-
- if (discoveryGroupConfiguration != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
-
- InetAddress lbAddress;
-
- if (discoveryGroupConfiguration.getLocalBindAddress() != null)
- {
- lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
- }
- else
- {
- lbAddress = null;
- }
-
- discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
- discoveryGroupConfiguration.getGroupPort(),
- discoveryGroupConfiguration.getRefreshTimeout());
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
-
- state = STATE.INITIALIZED;
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
- }
-
- private ServerLocatorImpl(final Topology topology,
- final boolean useHA,
- final DiscoveryGroupConfiguration discoveryGroupConfiguration,
- final TransportConfiguration[] transportConfigs)
- {
- e.fillInStackTrace();
-
- this.topology = topology;
-
- this.ha = useHA;
-
- this.discoveryGroupConfiguration = discoveryGroupConfiguration;
-
- this.initialConnectors = transportConfigs;
-
- this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
- clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
- connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
- minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
- consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
- confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
- producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
- producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
- blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
- blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
- blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
- autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
- preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
- ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
- connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
- useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
- scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
- threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
- retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
- retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
- maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
- reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
- initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
-
- failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
- clusterConnection = false;
- }
-
- /**
- * Create a ServerLocatorImpl using UDP discovery to lookup cluster
- *
- * @param discoveryAddress
- * @param discoveryPort
- */
- public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
- {
- this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
- if (useHA)
- {
- // We only set the owner at where the Topology was created.
- // For that reason we can't set it at the main constructor
- topology.setOwner(this);
- }
- }
-
- /**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
- {
- this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
- if (useHA)
- {
- // We only set the owner at where the Topology was created.
- // For that reason we can't set it at the main constructor
- topology.setOwner(this);
- }
- }
-
- /**
- * Create a ServerLocatorImpl using UDP discovery to lookup cluster
- *
- * @param discoveryAddress
- * @param discoveryPort
- */
- public ServerLocatorImpl(final Topology topology,
- final boolean useHA,
- final DiscoveryGroupConfiguration groupConfiguration)
- {
- this(topology, useHA, groupConfiguration, null);
-
- }
-
- /**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public ServerLocatorImpl(final Topology topology,
- final boolean useHA,
- final TransportConfiguration... transportConfigs)
- {
- this(topology, useHA, null, transportConfigs);
- }
-
- private TransportConfiguration selectConnector()
- {
- if (receivedTopology)
- {
- int pos = loadBalancingPolicy.select(topologyArray.length);
-
- Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
-
- return pair.getA();
- }
- else
- {
- // Get from initialconnectors
-
- int pos = loadBalancingPolicy.select(initialConnectors.length);
-
- return initialConnectors[pos];
- }
- }
-
- public void start(Executor executor) throws Exception
- {
- initialise();
-
- this.startExecutor = executor;
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- connect();
- }
- catch (Exception e)
- {
- if (isInitialized())
- {
- log.warn("did not connect the cluster connection to other nodes", e);
- }
- }
- }
- });
- }
-
- public Executor getExecutor()
- {
- return startExecutor;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
- */
- public void disableFinalizeCheck()
- {
- finalizeCheck = false;
- }
-
- public ClientSessionFactoryInternal connect() throws Exception
- {
- // static list of initial connectors
- if (initialConnectors != null && discoveryGroup == null)
- {
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
- addFactory(sf);
- return sf;
- }
- // wait for discovery group to get the list of initial connectors
- return (ClientSessionFactoryInternal)createSessionFactory();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.client.impl.ServerLocatorInternal#setAfterConnectionInternalListener(org.hornetq.core.client.impl.AfterConnectInternalListener)
- */
- public void setAfterConnectionInternalListener(AfterConnectInternalListener listener)
- {
- this.afterConnectListener = listener;
- }
-
- public AfterConnectInternalListener getAfterConnectInternalListener()
- {
- return afterConnectListener;
- }
-
- public boolean isClosed()
- {
- return state == STATE.CLOSED || state == STATE.CLOSING;
- }
-
- public boolean isInitialized()
- {
- return state == STATE.INITIALIZED;
- }
-
- public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
- {
- assertOpen();
-
- initialise();
-
- synchronized (this)
- {
- assertOpen();
- ClientSessionFactoryInternal factory =
- new ClientSessionFactoryImpl(this, transportConfiguration,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- addToConnecting(factory);
- try
- {
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
- addFactory(factory);
- return factory;
- }
- finally
- {
- removeFromConnecting(factory);
- }
- }
- }
-
- private void removeFromConnecting(ClientSessionFactoryInternal factory)
- {
- connectingFactories.remove(factory);
- }
-
- private void addToConnecting(ClientSessionFactoryInternal factory)
- {
- synchronized (connectingFactories)
- {
- assertOpen();
- connectingFactories.add(factory);
- }
- }
- private void assertOpen()
- {
- if (state != null && state != STATE.INITIALIZED)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
- }
-
- public ClientSessionFactory createSessionFactory() throws Exception
- {
- assertOpen();
-
- initialise();
-
- if (initialConnectors == null && discoveryGroup != null)
- {
- // Wait for an initial broadcast to give us at least one node in the cluster
- long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
- boolean ok = discoveryGroup.waitForBroadcast(timeout);
-
- if (!ok)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from cluster");
- }
- }
-
- ClientSessionFactoryInternal factory = null;
-
- synchronized (this)
- {
- assertOpen();
- boolean retry;
- int attempts = 0;
- do
- {
- retry = false;
-
- TransportConfiguration tc = selectConnector();
-
- // try each factory in the list until we find one which works
-
- try
- {
- assertOpen();
-
- factory =
- new ClientSessionFactoryImpl(this, tc, callTimeout, clientFailureCheckPeriod, connectionTTL,
- retryInterval, retryIntervalMultiplier, maxRetryInterval,
- reconnectAttempts, threadPool, scheduledThreadPool, interceptors);
- addToConnecting(factory);
- try
- {
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
- }
- finally
- {
- removeFromConnecting(factory);
- }
- }
- catch (HornetQException e)
- {
- if (factory != null)
- {
- factory.close();
- }
- factory = null;
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- attempts++;
-
- if (topologyArray != null && attempts == topologyArray.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- retry = true;
- }
- else
- {
- throw e;
- }
- }
- }
- while (retry);
-
- if (ha || clusterConnection)
- {
- long timeout = System.currentTimeMillis() + 30000;
- while (isInitialized() && !receivedTopology && timeout > System.currentTimeMillis())
- {
- // Now wait for the topology
-
- try
- {
- wait(1000);
- }
- catch (InterruptedException ignore)
- {
- }
- }
-
- if (System.currentTimeMillis() > timeout && !receivedTopology && isInitialized())
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
- }
- }
-
- addFactory(factory);
-
- return factory;
- }
- }
-
- public boolean isHA()
- {
- return ha;
- }
-
- public boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
- public long getClientFailureCheckPeriod()
- {
- return clientFailureCheckPeriod;
- }
-
- public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
- {
- checkWrite();
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- }
-
- public long getConnectionTTL()
- {
- return connectionTTL;
- }
-
- public void setConnectionTTL(final long connectionTTL)
- {
- checkWrite();
- this.connectionTTL = connectionTTL;
- }
-
- public long getCallTimeout()
- {
- return callTimeout;
- }
-
- public void setCallTimeout(final long callTimeout)
- {
- checkWrite();
- this.callTimeout = callTimeout;
- }
-
- public int getMinLargeMessageSize()
- {
- return minLargeMessageSize;
- }
-
- public void setMinLargeMessageSize(final int minLargeMessageSize)
- {
- checkWrite();
- this.minLargeMessageSize = minLargeMessageSize;
- }
-
- public int getConsumerWindowSize()
- {
- return consumerWindowSize;
- }
-
- public void setConsumerWindowSize(final int consumerWindowSize)
- {
- checkWrite();
- this.consumerWindowSize = consumerWindowSize;
- }
-
- public int getConsumerMaxRate()
- {
- return consumerMaxRate;
- }
-
- public void setConsumerMaxRate(final int consumerMaxRate)
- {
- checkWrite();
- this.consumerMaxRate = consumerMaxRate;
- }
-
- public int getConfirmationWindowSize()
- {
- return confirmationWindowSize;
- }
-
- public void setConfirmationWindowSize(final int confirmationWindowSize)
- {
- checkWrite();
- this.confirmationWindowSize = confirmationWindowSize;
- }
-
- public int getProducerWindowSize()
- {
- return producerWindowSize;
- }
-
- public void setProducerWindowSize(final int producerWindowSize)
- {
- checkWrite();
- this.producerWindowSize = producerWindowSize;
- }
-
- public int getProducerMaxRate()
- {
- return producerMaxRate;
- }
-
- public void setProducerMaxRate(final int producerMaxRate)
- {
- checkWrite();
- this.producerMaxRate = producerMaxRate;
- }
-
- public boolean isBlockOnAcknowledge()
- {
- return blockOnAcknowledge;
- }
-
- public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
- {
- checkWrite();
- this.blockOnAcknowledge = blockOnAcknowledge;
- }
-
- public boolean isBlockOnDurableSend()
- {
- return blockOnDurableSend;
- }
-
- public void setBlockOnDurableSend(final boolean blockOnDurableSend)
- {
- checkWrite();
- this.blockOnDurableSend = blockOnDurableSend;
- }
-
- public boolean isBlockOnNonDurableSend()
- {
- return blockOnNonDurableSend;
- }
-
- public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
- {
- checkWrite();
- this.blockOnNonDurableSend = blockOnNonDurableSend;
- }
-
- public boolean isAutoGroup()
- {
- return autoGroup;
- }
-
- public void setAutoGroup(final boolean autoGroup)
- {
- checkWrite();
- this.autoGroup = autoGroup;
- }
-
- public boolean isPreAcknowledge()
- {
- return preAcknowledge;
- }
-
- public void setPreAcknowledge(final boolean preAcknowledge)
- {
- checkWrite();
- this.preAcknowledge = preAcknowledge;
- }
-
- public int getAckBatchSize()
- {
- return ackBatchSize;
- }
-
- public void setAckBatchSize(final int ackBatchSize)
- {
- checkWrite();
- this.ackBatchSize = ackBatchSize;
- }
-
- public boolean isUseGlobalPools()
- {
- return useGlobalPools;
- }
-
- public void setUseGlobalPools(final boolean useGlobalPools)
- {
- checkWrite();
- this.useGlobalPools = useGlobalPools;
- }
-
- public int getScheduledThreadPoolMaxSize()
- {
- return scheduledThreadPoolMaxSize;
- }
-
- public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
- {
- checkWrite();
- this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
- }
-
- public int getThreadPoolMaxSize()
- {
- return threadPoolMaxSize;
- }
-
- public void setThreadPoolMaxSize(final int threadPoolMaxSize)
- {
- checkWrite();
- this.threadPoolMaxSize = threadPoolMaxSize;
- }
-
- public long getRetryInterval()
- {
- return retryInterval;
- }
-
- public void setRetryInterval(final long retryInterval)
- {
- checkWrite();
- this.retryInterval = retryInterval;
- }
-
- public long getMaxRetryInterval()
- {
- return maxRetryInterval;
- }
-
- public void setMaxRetryInterval(final long retryInterval)
- {
- checkWrite();
- maxRetryInterval = retryInterval;
- }
-
- public double getRetryIntervalMultiplier()
- {
- return retryIntervalMultiplier;
- }
-
- public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
- {
- checkWrite();
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- }
-
- public int getReconnectAttempts()
- {
- return reconnectAttempts;
- }
-
- public void setReconnectAttempts(final int reconnectAttempts)
- {
- checkWrite();
- this.reconnectAttempts = reconnectAttempts;
- }
-
- public void setInitialConnectAttempts(int initialConnectAttempts)
- {
- checkWrite();
- this.initialConnectAttempts = initialConnectAttempts;
- }
-
- public int getInitialConnectAttempts()
- {
- return initialConnectAttempts;
- }
-
- public boolean isFailoverOnInitialConnection()
- {
- return this.failoverOnInitialConnection;
- }
-
- public void setFailoverOnInitialConnection(final boolean failover)
- {
- checkWrite();
- this.failoverOnInitialConnection = failover;
- }
-
- public String getConnectionLoadBalancingPolicyClassName()
- {
- return connectionLoadBalancingPolicyClassName;
- }
-
- public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
- {
- checkWrite();
- connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
- }
-
- public TransportConfiguration[] getStaticTransportConfigurations()
- {
- return this.initialConnectors;
- }
-
- public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
- {
- return discoveryGroupConfiguration;
- }
-
- public void addInterceptor(final Interceptor interceptor)
- {
- interceptors.add(interceptor);
- }
-
- public boolean removeInterceptor(final Interceptor interceptor)
- {
- return interceptors.remove(interceptor);
- }
-
- public int getInitialMessagePacketSize()
- {
- return initialMessagePacketSize;
- }
-
- public void setInitialMessagePacketSize(final int size)
- {
- checkWrite();
- initialMessagePacketSize = size;
- }
-
- public void setGroupID(final String groupID)
- {
- checkWrite();
- this.groupID = groupID;
- }
-
- public String getGroupID()
- {
- return groupID;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
- */
- public boolean isCompressLargeMessage()
- {
- return compressLargeMessage;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
- */
- public void setCompressLargeMessage(boolean compress)
- {
- this.compressLargeMessage = compress;
- }
-
- private void checkWrite()
- {
- if (state == STATE.INITIALIZED)
- {
- throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
- }
- }
-
- public String getIdentity()
- {
- return identity;
- }
-
- public void setIdentity(String identity)
- {
- this.identity = identity;
- }
-
- public void setNodeID(String nodeID)
- {
- this.nodeID = nodeID;
- }
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public void setClusterConnection(boolean clusterConnection)
- {
- this.clusterConnection = clusterConnection;
- }
-
- public boolean isClusterConnection()
- {
- return clusterConnection;
- }
-
- public TransportConfiguration getClusterTransportConfiguration()
- {
- return clusterTransportConfiguration;
- }
-
- public void setClusterTransportConfiguration(TransportConfiguration tc)
- {
- this.clusterTransportConfiguration = tc;
- }
-
- public boolean isBackup()
- {
- return backup;
- }
-
- public void setBackup(boolean backup)
- {
- this.backup = backup;
- }
-
- @Override
- protected void finalize() throws Throwable
- {
- if (finalizeCheck)
- {
- close();
- }
-
- super.finalize();
- }
-
- public void cleanup()
- {
- doClose(false);
- }
-
- public void close()
- {
- doClose(true);
- }
-
- private void doClose(final boolean sendClose)
- {
- if (state == STATE.CLOSED)
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + " is already closed when calling closed");
- }
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug(this + " is calling close", new Exception("trace"));
- }
-
- state = STATE.CLOSING;
-
- if (discoveryGroup != null)
- {
- try
- {
- discoveryGroup.stop();
- }
- catch (Exception e)
- {
- log.error("Failed to stop discovery group", e);
- }
- }
- else
- {
- staticConnector.disconnect();
- }
-
- synchronized (connectingFactories)
- {
- for (ClientSessionFactoryInternal factory : connectingFactories)
- {
- factory.causeExit();
- factory.close();
- }
- connectingFactories.clear();
- }
-
- synchronized (factories)
- {
- Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
-
- for (ClientSessionFactory factory : clonedFactory)
- {
- if (sendClose)
- {
- factory.close();
- }
- else
- {
- factory.cleanup();
- }
- }
-
- factories.clear();
- }
-
- if (shutdownPool)
- {
- if (threadPool != null)
- {
- threadPool.shutdown();
-
- try
- {
- if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
-
- if (scheduledThreadPool != null)
- {
- scheduledThreadPool.shutdown();
-
- try
- {
- if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for scheduled pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
- state = STATE.CLOSED;
- }
-
- /** This is directly called when the connection to the node is gone,
- * or when the node sends a disconnection.
- * Look for callers of this method! */
- public void notifyNodeDown(final long eventTime, final String nodeID)
- {
-
- if (topology == null)
- {
- // there's no topology here
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
- }
-
- if (topology.removeMember(eventTime, nodeID))
- {
- if (topology.isEmpty())
- {
- // Resetting the topology to its original condition as it was brand new
- synchronized (this)
- {
- topologyArray = null;
- receivedTopology = false;
- }
- }
- else
- {
- updateArraysAndPairs();
-
- if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
- {
- // Resetting the topology to its original condition as it was brand new
- receivedTopology = false;
- }
- }
- }
-
- }
-
- public void notifyNodeUp(long uniqueEventID,
- final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
- {
- if (topology == null)
- {
- // there's no topology
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
- }
-
- TopologyMember member = new TopologyMember(connectorPair.getA(), connectorPair.getB());
-
- if (topology.updateMember(uniqueEventID, nodeID, member))
- {
-
- TopologyMember actMember = topology.getMember(nodeID);
-
- if (actMember != null && actMember.getConnector().getA() != null && actMember.getConnector().getB() != null)
- {
- for (ClientSessionFactory factory : factories)
- {
- ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().getA(),
- actMember.getConnector().getB());
- }
- }
-
- updateArraysAndPairs();
- }
-
- if (last)
- {
- synchronized (this)
- {
- receivedTopology = true;
- // Notify if waiting on getting topology
- notifyAll();
- }
- }
- }
-
- @Override
- public String toString()
- {
- if (identity != null)
- {
- return "ServerLocatorImpl (identity=" + identity +
- ") [initialConnectors=" +
- Arrays.toString(initialConnectors) +
- ", discoveryGroupConfiguration=" +
- discoveryGroupConfiguration +
- "]";
- }
- return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
- ", discoveryGroupConfiguration=" +
- discoveryGroupConfiguration +
- "]";
- }
-
- private synchronized void updateArraysAndPairs()
- {
- Collection<TopologyMember> membersCopy = topology.getMembers();
-
- topologyArray =
- (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class, membersCopy.size());
-
- int count = 0;
- for (TopologyMember pair : membersCopy)
- {
- topologyArray[count++] = pair.getConnector();
- }
- }
-
- public synchronized void connectorsChanged()
- {
- List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
-
- this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
- newConnectors.size());
-
- int count = 0;
- for (DiscoveryEntry entry : newConnectors)
- {
- this.initialConnectors[count++] = entry.getConnector();
-
- if (topology != null && topology.getMember(entry.getNodeID()) == null)
- {
- TopologyMember member = new TopologyMember(entry.getConnector(), null);
- // on this case we set it as zero as any update coming from server should be accepted
- topology.updateMember(0, entry.getNodeID(), member);
- }
- }
-
- if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
- {
- // FIXME the node is alone in the cluster. We create a connection to the new node
- // to trigger the node notification to form the cluster.
- try
- {
- connect();
- }
- catch (Exception e)
- {
- e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- public synchronized void factoryClosed(final ClientSessionFactory factory)
- {
- factories.remove(factory);
-
- if (!clusterConnection && factories.isEmpty())
- {
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topologyArray = null;
- }
- }
-
- public Topology getTopology()
- {
- return topology;
- }
-
- public void addClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topology.addClusterTopologyListener(listener);
- }
-
- public void removeClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topology.removeClusterTopologyListener(listener);
- }
-
- private synchronized void addFactory(ClientSessionFactoryInternal factory)
- {
- if (factory == null)
- {
- return;
- }
-
- synchronized (factories)
- {
- if (isClosed())
- {
- factory.close();
- return;
- }
-
- TransportConfiguration backup = null;
-
- if (topology != null)
- {
- backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
- }
-
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
- factories.add(factory);
- }
- }
-
- private final class StaticConnector implements Serializable
- {
- private static final long serialVersionUID = 6772279632415242634l;
-
- private List<Connector> connectors;
-
- public ClientSessionFactory connect() throws HornetQException
- {
- if (isClosed())
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- initialise();
-
- ClientSessionFactory csf = null;
-
- createConnectors();
-
- try
- {
-
- int retryNumber = 0;
- while (csf == null && isInitialized())
- {
- retryNumber++;
- for (Connector conn : connectors)
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + "::Submitting connect towards " + conn);
- }
-
- csf = conn.tryConnect();
-
- if (csf != null)
- {
- csf.getConnection().addFailureListener(new FailureListener()
- {
- // Case the node where the cluster connection was connected is gone, we need to restart the
- // connection
- public void connectionFailed(HornetQException exception, boolean failedOver)
- {
- if (clusterConnection && exception.getCode() == HornetQException.DISCONNECTED)
- {
- try
- {
- ServerLocatorImpl.this.start(startExecutor);
- }
- catch (Exception e)
- {
- // There isn't much to be done if this happens here
- log.warn(e.getMessage());
- }
- }
- }
- });
-
- if (log.isDebugEnabled())
- {
- log.debug("Returning " + csf +
- " after " +
- retryNumber +
- " retries on StaticConnector " +
- ServerLocatorImpl.this);
- }
-
- return csf;
- }
- }
-
- if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts)
- {
- break;
- }
-
- if (isInitialized())
- {
- Thread.sleep(retryInterval);
- }
- }
-
- }
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
- }
-
- if (csf == null && isInitialized())
- {
- log.warn("Failed to connecto to any static connector, throwing exception now");
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
- }
- if (log.isDebugEnabled())
- {
- log.debug("Returning " + csf + " on " + ServerLocatorImpl.this);
- }
- return csf;
- }
-
- private synchronized void createConnectors()
- {
- if (connectors != null)
- {
- for (Connector conn : connectors)
- {
- if (conn != null)
- {
- conn.disconnect();
- }
- }
- }
- connectors = new ArrayList<Connector>();
- for (TransportConfiguration initialConnector : initialConnectors)
- {
- assertOpen();
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- factory.disableFinalizeCheck();
-
- connectors.add(new Connector(initialConnector, factory));
- }
- }
-
- public synchronized void disconnect()
- {
- if (connectors != null)
- {
- for (Connector connector : connectors)
- {
- connector.disconnect();
- }
- }
- }
-
- @Override
- public void finalize() throws Throwable
- {
- if (state != STATE.CLOSED && finalizeCheck)
- {
- log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
- System.identityHashCode(this));
-
- log.warn("The ServerLocator you didn't close was created here:", e);
-
- if (ServerLocatorImpl.finalizeCallback != null)
- {
- ServerLocatorImpl.finalizeCallback.run();
- }
-
- close();
- }
-
- super.finalize();
- }
-
- private class Connector
- {
- private final TransportConfiguration initialConnector;
-
- private volatile ClientSessionFactoryInternal factory;
-
- private Exception e;
-
- Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
- {
- this.initialConnector = initialConnector;
- this.factory = factory;
- }
-
- private ClientSessionFactory tryConnect() throws HornetQException
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + "::Trying to connect to " + factory);
- }
- try
- {
- ClientSessionFactoryInternal factoryToUse = factory;
- if (factoryToUse != null)
- {
- addToConnecting(factoryToUse);
-
- try
- {
- factoryToUse.connect(1, false);
- }
- finally
- {
- removeFromConnecting(factoryToUse);
- }
- }
- return factoryToUse;
- }
- catch (HornetQException e)
- {
- log.debug(this + "::Exception on establish connector initial connection", e);
- return null;
- }
- }
-
- public void disconnect()
- {
- if (factory != null)
- {
- factory.causeExit();
- factory.cleanup();
- factory = null;
- }
- }
-
- @Override
- public String toString()
- {
- return "Connector [initialConnector=" + initialConnector + "]";
- }
-
- }
- }
-}
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
+
+/**
+ * A StaticServerLocatorImpl
+ * @author Tim Fox
+ */
+public class StaticServerLocatorImpl extends AbstractServerLocator
+{
+ private static final long serialVersionUID = 6473494410188544024L;
+
+ private static final Logger log = Logger.getLogger(StaticServerLocatorImpl.class);
+
+ private final StaticConnector staticConnector = new StaticConnector();
+
+ private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ @Override
+ protected void initialiseInternal()
+ {
+ // Nothing for this ServerLocator
+ }
+
+ private StaticServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration,
+ final TransportConfiguration[] transportConfigs)
+ {
+ super(topology, useHA, discoveryGroupConfiguration, transportConfigs);
+
+ Map<String, Object> params = discoveryGroupConfiguration.getParams();
+ List<TransportConfiguration> initialConnectors =
+ (List<TransportConfiguration>)params.get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
+ setStaticTransportConfigurations(initialConnectors.toArray(new TransportConfiguration[0]));
+
+ e.fillInStackTrace();
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public StaticServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ getTopology().setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public StaticServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
+ {
+ this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ getTopology().setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public StaticServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(topology, useHA, groupConfiguration, null);
+
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public StaticServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final TransportConfiguration... transportConfigs)
+ {
+ this(topology, useHA, null, transportConfigs);
+ }
+
+ @Override
+ public ClientSessionFactoryInternal connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ addFactory(sf);
+ return sf;
+ }
+
+ @Override
+ protected void waitInitialDiscovery()
+ {
+ // Nothing for this ServerLocator
+ }
+
+ @Override
+ protected void doCloseInternal()
+ {
+ staticConnector.disconnect();
+ }
+
+ private final class StaticConnector implements Serializable
+ {
+ private static final long serialVersionUID = 6772279632415242634l;
+
+ private List<Connector> connectors;
+
+ public ClientSessionFactory connect() throws HornetQException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ initialise();
+
+ ClientSessionFactory csf = null;
+
+ createConnectors();
+
+ try
+ {
+
+ int retryNumber = 0;
+ while (csf == null && isInitialized())
+ {
+ retryNumber++;
+ for (Connector conn : connectors)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::Submitting connect towards " + conn);
+ }
+
+ csf = conn.tryConnect();
+
+ if (csf != null)
+ {
+ csf.getConnection().addFailureListener(new FailureListener()
+ {
+ // Case the node where the cluster connection was connected is gone, we need to restart the
+ // connection
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ if (isClusterConnection() && exception.getCode() == HornetQException.DISCONNECTED)
+ {
+ try
+ {
+ StaticServerLocatorImpl.this.start(getExecutor());
+ }
+ catch (Exception e)
+ {
+ // There isn't much to be done if this happens here
+ log.warn(e.getMessage());
+ }
+ }
+ }
+ });
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Returning " + csf +
+ " after " +
+ retryNumber +
+ " retries on StaticConnector " +
+ StaticServerLocatorImpl.this);
+ }
+
+ return csf;
+ }
+ }
+
+ if (getInitialConnectAttempts() >= 0 && retryNumber > getInitialConnectAttempts())
+ {
+ break;
+ }
+
+ if (isInitialized())
+ {
+ Thread.sleep(getRetryInterval());
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+ }
+
+ if (csf == null && isInitialized())
+ {
+ log.warn("Failed to connecto to any static connector, throwing exception now");
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Returning " + csf + " on " + StaticServerLocatorImpl.this);
+ }
+ return csf;
+ }
+
+ private synchronized void createConnectors()
+ {
+ if (connectors != null)
+ {
+ for (Connector conn : connectors)
+ {
+ if (conn != null)
+ {
+ conn.disconnect();
+ }
+ }
+ }
+ connectors = new ArrayList<Connector>();
+ for (TransportConfiguration initialConnector : getStaticTransportConfigurations())
+ {
+ assertOpen();
+ ClientSessionFactoryInternal factory =
+ new ClientSessionFactoryImpl(StaticServerLocatorImpl.this,
+ initialConnector,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+
+ factory.disableFinalizeCheck();
+
+ connectors.add(new Connector(initialConnector, factory));
+ }
+ }
+
+ public synchronized void disconnect()
+ {
+ if (connectors != null)
+ {
+ for (Connector connector : connectors)
+ {
+ connector.disconnect();
+ }
+ }
+ }
+
+ @Override
+ public void finalize() throws Throwable
+ {
+ if (getState() != STATE.CLOSED && isFinalizeCheckEnabled())
+ {
+ log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
+ System.identityHashCode(this));
+
+ log.warn("The ServerLocator you didn't close was created here:", e);
+
+ if (StaticServerLocatorImpl.finalizeCallback != null)
+ {
+ StaticServerLocatorImpl.finalizeCallback.run();
+ }
+
+ close();
+ }
+
+ super.finalize();
+ }
+
+ private class Connector
+ {
+ private final TransportConfiguration initialConnector;
+
+ private volatile ClientSessionFactoryInternal factory;
+
+ private Exception e;
+
+ Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
+ {
+ this.initialConnector = initialConnector;
+ this.factory = factory;
+ }
+
+ private ClientSessionFactory tryConnect() throws HornetQException
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::Trying to connect to " + factory);
+ }
+ try
+ {
+ ClientSessionFactoryInternal factoryToUse = factory;
+ if (factoryToUse != null)
+ {
+ addToConnecting(factoryToUse);
+
+ try
+ {
+ factoryToUse.connect(1, false);
+ }
+ finally
+ {
+ removeFromConnecting(factoryToUse);
+ }
+ }
+ return factoryToUse;
+ }
+ catch (HornetQException e)
+ {
+ log.debug(this + "::Exception on establish connector initial connection", e);
+ return null;
+ }
+ }
+
+ public void disconnect()
+ {
+ if (factory != null)
+ {
+ factory.causeExit();
+ factory.cleanup();
+ factory = null;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Connector [initialConnector=" + initialConnector + "]";
+ }
+
+ }
+ }
+}
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/UDPServerLocatorImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A UDPServerLocatorImpl
+ * @author Tim Fox
+ */
+public class UDPServerLocatorImpl extends AbstractServerLocator
+{
+ private static final long serialVersionUID = -1086825204860145543L;
+
+ private static final Logger log = Logger.getLogger(UDPServerLocatorImpl.class);
+
+ private String discoveryGroupName;
+
+ private InetAddress localBindAddress;
+
+ private InetAddress groupAddress;
+
+ private int groupPort;
+
+ private long refreshTimeout;
+
+ private long initialWaitTimeout;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private final Exception e = new Exception();
+
+ @Override
+ protected synchronized void initialiseInternal() throws Exception
+ {
+ discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+ String lbaStr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
+
+ localBindAddress = lbaStr != null ? InetAddress.getByName(lbaStr) : null;
+
+ String graStr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params);
+
+ groupAddress = graStr != null ? InetAddress.getByName(graStr) : null;
+
+ groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
+ refreshTimeout =
+ ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME,
+ ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+ initialWaitTimeout =
+ ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME,
+ HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+
+ discoveryGroup = new DiscoveryGroupImpl(getNodeID(),
+ this.discoveryGroupName,
+ this.localBindAddress,
+ this.groupAddress,
+ this.groupPort,
+ this.refreshTimeout);
+
+ discoveryGroup.registerListener(this);
+
+ discoveryGroup.start();
+ }
+
+ private UDPServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration,
+ final TransportConfiguration[] transportConfigs)
+ {
+ super(topology, useHA, discoveryGroupConfiguration, transportConfigs);
+
+ e.fillInStackTrace();
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public UDPServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ getTopology().setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public UDPServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
+ {
+ this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ getTopology().setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public UDPServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(topology, useHA, groupConfiguration, null);
+
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public UDPServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final TransportConfiguration... transportConfigs)
+ {
+ this(topology, useHA, null, transportConfigs);
+ }
+
+ @Override
+ public ClientSessionFactoryInternal connect() throws Exception
+ {
+ return (ClientSessionFactoryInternal)createSessionFactory();
+ }
+
+ @Override
+ protected void doCloseInternal()
+ {
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
+ }
+
+ @Override
+ protected void waitInitialDiscovery() throws Exception
+ {
+ // Wait for an initial broadcast to give us at least one node in the cluster
+ long timeout = isClusterConnection() ? 0 : initialWaitTimeout;
+ boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial broadcast from cluster");
+ }
+ }
+
+}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConfiguration.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConfiguration.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConfiguration.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -15,14 +15,16 @@
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
/**
* A BroadcastGroupConfiguration
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 18 Nov 2008 08:44:30
*
*/
@@ -34,34 +36,21 @@
private String name;
- private String localBindAddress;
+ private final String broadcastGroupClassName;
- private int localBindPort;
+ private final Map<String, Object> params;
- private String groupAddress;
+ private final List<TransportConfiguration> connectorList;
- private int groupPort;
-
- private long broadcastPeriod;
-
- private List<String> connectorInfos;
-
public BroadcastGroupConfiguration(final String name,
- final String localBindAddress,
- final int localBindPort,
- final String groupAddress,
- final int groupPort,
- final long broadcastPeriod,
- final List<String> connectorInfos)
+ final String clazz, final Map<String, Object> params,
+ final List<TransportConfiguration> connectors)
{
super();
this.name = name;
- this.localBindAddress = localBindAddress;
- this.localBindPort = localBindPort;
- this.groupAddress = groupAddress;
- this.groupPort = groupPort;
- this.broadcastPeriod = broadcastPeriod;
- this.connectorInfos = connectorInfos;
+ this.broadcastGroupClassName = clazz;
+ this.params = params;
+ this.connectorList = connectors;
}
public String getName()
@@ -69,36 +58,21 @@
return name;
}
- public String getLocalBindAddress()
+ public String getBroadcastGroupClassName()
{
- return localBindAddress;
+ return broadcastGroupClassName;
}
- public int getLocalBindPort()
+ public Map<String, Object> getParams()
{
- return localBindPort;
+ return params;
}
- public String getGroupAddress()
+ public List<TransportConfiguration> getConnectorList()
{
- return groupAddress;
+ return connectorList;
}
- public int getGroupPort()
- {
- return groupPort;
- }
-
- public long getBroadcastPeriod()
- {
- return broadcastPeriod;
- }
-
- public List<String> getConnectorInfos()
- {
- return connectorInfos;
- }
-
/**
* @param name the name to set
*/
@@ -106,53 +80,4 @@
{
this.name = name;
}
-
- /**
- * @param localBindAddress the localBindAddress to set
- */
- public void setLocalBindAddress(final String localBindAddress)
- {
- this.localBindAddress = localBindAddress;
- }
-
- /**
- * @param localBindPort the localBindPort to set
- */
- public void setLocalBindPort(final int localBindPort)
- {
- this.localBindPort = localBindPort;
- }
-
- /**
- * @param groupAddress the groupAddress to set
- */
- public void setGroupAddress(final String groupAddress)
- {
- this.groupAddress = groupAddress;
- }
-
- /**
- * @param groupPort the groupPort to set
- */
- public void setGroupPort(final int groupPort)
- {
- this.groupPort = groupPort;
- }
-
- /**
- * @param broadcastPeriod the broadcastPeriod to set
- */
- public void setBroadcastPeriod(final long broadcastPeriod)
- {
- this.broadcastPeriod = broadcastPeriod;
- }
-
- /**
- * @param connectorInfos the connectorInfos to set
- */
- public void setConnectorInfos(final List<String> connectorInfos)
- {
- this.connectorInfos = connectorInfos;
- }
-
}
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.config;
+
+/**
+ * A BroadcastGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class BroadcastGroupConstants
+{
+ // for simple UDP broadcast
+ public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+ public static final String LOCAL_BIND_PORT_NAME = "local-bind-port";
+ public static final String GROUP_ADDRESS_NAME = "group-address";
+ public static final String GROUP_PORT_NAME = "group-port";
+ public static final String BROADCAST_PERIOD_NAME = "broadcast-period";
+}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -16,6 +16,8 @@
import java.io.Serializable;
import java.util.List;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -23,7 +25,7 @@
* A ClusterConnectionConfiguration
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 13 Jan 2009 09:42:17
*
*
@@ -35,114 +37,47 @@
private final String name;
private final String address;
-
+
private final String connectorName;
private final long clientFailureCheckPeriod;
-
+
private final long connectionTTL;
-
+
private final long retryInterval;
-
+
private final double retryIntervalMultiplier;
-
+
private final long maxRetryInterval;
-
+
private final int reconnectAttempts;
-
+
private final long callTimeout;
private final boolean duplicateDetection;
private final boolean forwardWhenNoConsumers;
- private final List<String> staticConnectors;
+ private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
- private final String discoveryGroupName;
-
private final int maxHops;
private final int confirmationWindowSize;
private final boolean allowDirectConnectionsOnly;
-
- public ClusterConnectionConfiguration(final String name,
- final String address,
- final String connectorName,
- final long retryInterval,
- final boolean duplicateDetection,
- final boolean forwardWhenNoConsumers,
- final int maxHops,
- final int confirmationWindowSize,
- final List<String> staticConnectors,
- final boolean allowDirectConnectionsOnly)
- {
- this(name,
- address,
- connectorName,
- ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
- ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
- retryInterval,
- ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
- ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
- ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- duplicateDetection,
- forwardWhenNoConsumers,
- maxHops,
- confirmationWindowSize,
- staticConnectors,
- allowDirectConnectionsOnly);
- }
+ private final List<TransportConfiguration> directConnectors;
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
- final long clientFailureCheckPeriod,
- final long connectionTTL,
final long retryInterval,
- final double retryIntervalMultiplier,
- final long maxRetryInterval,
- final int reconnectAttempts,
- final long callTimeout,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
final int confirmationWindowSize,
- final List<String> staticConnectors,
- final boolean allowDirectConnectionsOnly)
+ final DiscoveryGroupConfiguration discoveryGroupConfig)
{
- this.name = name;
- this.address = address;
- this.connectorName = connectorName;
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- this.connectionTTL = connectionTTL;
- this.retryInterval = retryInterval;
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- this.maxRetryInterval = maxRetryInterval;
- this.reconnectAttempts = reconnectAttempts;
- this.staticConnectors = staticConnectors;
- this.duplicateDetection = duplicateDetection;
- this.callTimeout = callTimeout;
- this.forwardWhenNoConsumers = forwardWhenNoConsumers;
- discoveryGroupName = null;
- this.maxHops = maxHops;
- this.confirmationWindowSize = confirmationWindowSize;
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
- }
-
-
- public ClusterConnectionConfiguration(final String name,
- final String address,
- final String connectorName,
- final long retryInterval,
- final boolean duplicateDetection,
- final boolean forwardWhenNoConsumers,
- final int maxHops,
- final int confirmationWindowSize,
- final String discoveryGroupName)
- {
this(name,
address,
connectorName,
@@ -157,7 +92,8 @@
forwardWhenNoConsumers,
maxHops,
confirmationWindowSize,
- discoveryGroupName);
+ discoveryGroupConfig,
+ false, null);
}
@@ -175,7 +111,9 @@
final boolean forwardWhenNoConsumers,
final int maxHops,
final int confirmationWindowSize,
- final String discoveryGroupName)
+ final DiscoveryGroupConfiguration discoveryGroupConfig,
+ boolean allowDirectConnectionsOnly,
+ final List<TransportConfiguration> directConnectors)
{
this.name = name;
this.address = address;
@@ -189,11 +127,11 @@
this.callTimeout = callTimeout;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
- this.discoveryGroupName = discoveryGroupName;
- this.staticConnectors = null;
+ this.discoveryGroupConfiguration = discoveryGroupConfig;
this.maxHops = maxHops;
this.confirmationWindowSize = confirmationWindowSize;
- allowDirectConnectionsOnly = false;
+ this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+ this.directConnectors = directConnectors;
}
public String getName()
@@ -205,7 +143,7 @@
{
return address;
}
-
+
/**
* @return the clientFailureCheckPeriod
*/
@@ -245,12 +183,12 @@
{
return reconnectAttempts;
}
-
+
public long getCallTimeout()
{
return callTimeout;
}
-
+
public String getConnectorName()
{
return connectorName;
@@ -276,14 +214,14 @@
return confirmationWindowSize;
}
- public List<String> getStaticConnectors()
+ public List<TransportConfiguration> getAllowedConnectors()
{
- return staticConnectors;
+ return directConnectors;
}
- public String getDiscoveryGroupName()
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
{
- return discoveryGroupName;
+ return discoveryGroupConfiguration;
}
public long getRetryInterval()
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -22,13 +22,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.StringTokenizer;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.config.*;
+import org.hornetq.core.config.BridgeConfiguration;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.ConnectorServiceConfiguration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.config.impl.Validators;
@@ -128,7 +136,16 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ public static enum DiscoveryType
+ {
+ STATIC, UDP, JGROUPS
+ };
+ public static enum BroadcastType
+ {
+ UDP, JGROUPS
+ };
+
/**
* @return the validateAIO
*/
@@ -901,22 +918,32 @@
{
String name = e.getAttribute("name");
- String localAddress = XMLConfigurationUtil.getString(e, "local-bind-address", null, Validators.NO_CHECK);
+ String type = XMLConfigurationUtil.getString(e, "broadcast-type", null, Validators.NOT_NULL_OR_EMPTY);
+ String clazz = null;
+ try
+ {
+ switch (BroadcastType.valueOf(type))
+ {
+ case UDP:
+ clazz = "org.hornetq.core.server.cluster.impl.BroadcastGroupImpl";
+ break;
+ case JGROUPS:
+ clazz = "org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl";
+ break;
+ default:
+ throw new RuntimeException("BUG: broadcast-type=" + type +
+ " must be processed in FileConfigurationParser#parseBroadcastGroupConfiguration().");
+ }
+ }
+ catch (IllegalArgumentException ex)
+ {
+ log.warn("broadcast-type=" + type + " is unsupported. It must be one of " + BroadcastType.values());
+ return;
+ }
- int localBindPort = XMLConfigurationUtil.getInteger(e, "local-bind-port", -1, Validators.MINUS_ONE_OR_GT_ZERO);
-
- String groupAddress = XMLConfigurationUtil.getString(e, "group-address", null, Validators.NOT_NULL_OR_EMPTY);
-
- int groupPort = XMLConfigurationUtil.getInteger(e, "group-port", -1, Validators.GT_ZERO);
-
- long broadcastPeriod = XMLConfigurationUtil.getLong(e,
- "broadcast-period",
- ConfigurationImpl.DEFAULT_BROADCAST_PERIOD,
- Validators.GT_ZERO);
-
NodeList children = e.getChildNodes();
- List<String> connectorNames = new ArrayList<String>();
+ List<TransportConfiguration> connectorList = new ArrayList<TransportConfiguration>();
for (int j = 0; j < children.getLength(); j++)
{
@@ -929,18 +956,30 @@
null,
Validators.NOT_NULL_OR_EMPTY);
- connectorNames.add(connectorName);
+ connectorList.add(mainConfig.getConnectorConfigurations().get(connectorName));
}
}
+ Map<String, Object> params = new HashMap<String, Object>();
- BroadcastGroupConfiguration config = new BroadcastGroupConfiguration(name,
- localAddress,
- localBindPort,
- groupAddress,
- groupPort,
- broadcastPeriod,
- connectorNames);
+ NodeList paramsNodes = e.getElementsByTagName("param");
+ for (int i = 0; i < paramsNodes.getLength(); i++)
+ {
+ Node paramNode = paramsNodes.item(i);
+
+ NamedNodeMap attributes = paramNode.getAttributes();
+
+ Node nkey = attributes.getNamedItem("key");
+
+ String key = nkey.getTextContent();
+
+ Node nValue = attributes.getNamedItem("value");
+
+ params.put(key, nValue.getTextContent());
+ }
+
+ BroadcastGroupConfiguration config = new BroadcastGroupConfiguration(name, clazz, params, connectorList);
+
mainConfig.getBroadcastGroupConfigurations().add(config);
}
@@ -948,33 +987,73 @@
{
String name = e.getAttribute("name");
- String localBindAddress = XMLConfigurationUtil.getString(e, "local-bind-address", null, Validators.NO_CHECK);
+ String type = XMLConfigurationUtil.getString(e, "discovery-type", null, Validators.NOT_NULL_OR_EMPTY);
+ String serverLocatorClassName, clusterConnectorClassName = null;
+ try
+ {
+ switch (DiscoveryType.valueOf(type))
+ {
+ case STATIC:
+ serverLocatorClassName = "org.hornetq.core.client.impl.StaticServerLocatorImpl";
+ clusterConnectorClassName = "org.hornetq.core.server.cluster.impl.StaticClusterConnectorImpl";
+ break;
+ case UDP:
+ serverLocatorClassName = "org.hornetq.core.client.impl.UDPServerLocatorImpl";
+ clusterConnectorClassName = "org.hornetq.core.server.cluster.impl.UDPDiscoveryClusterConnectorImpl";
+ break;
+ case JGROUPS:
+ serverLocatorClassName = "org.hornetq.integration.discovery.jgroups.JGroupsServerLocatorImpl";
+ clusterConnectorClassName = "org.hornetq.core.server.cluster.impl.JGroupsDiscoveryClusterConnectorImpl";
+ break;
+ default:
+ throw new RuntimeException("BUG: discovery-type=" + type +
+ " must be processed in FileConfigurationParser#parseDiscoveryGroupConfiguration().");
+ }
+ }
+ catch (IllegalArgumentException ex)
+ {
+ log.warn("discovery-type=" + type + " is unsupported. It must be one of " + DiscoveryType.values());
+ return;
+ }
- String groupAddress = XMLConfigurationUtil.getString(e, "group-address", null, Validators.NOT_NULL_OR_EMPTY);
+ Map<String, Object> params = new HashMap<String, Object>();
- int groupPort = XMLConfigurationUtil.getInteger(e, "group-port", -1, Validators.MINUS_ONE_OR_GT_ZERO);
+ NodeList paramsNodes = e.getElementsByTagName("param");
- long discoveryInitialWaitTimeout = XMLConfigurationUtil.getLong(e,
- "initial-wait-timeout",
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- Validators.GT_ZERO);
+ for (int i = 0; i < paramsNodes.getLength(); i++)
+ {
+ Node paramNode = paramsNodes.item(i);
- long refreshTimeout = XMLConfigurationUtil.getLong(e,
- "refresh-timeout",
- ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
- Validators.GT_ZERO);
+ NamedNodeMap attributes = paramNode.getAttributes();
- DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(name,
- localBindAddress,
- groupAddress,
- groupPort,
- refreshTimeout,
- discoveryInitialWaitTimeout);
+ Node nkey = attributes.getNamedItem("key");
+ String key = nkey.getTextContent();
+
+ Node nValue = attributes.getNamedItem("value");
+
+ params.put(key, nValue.getTextContent());
+ }
+
+ String connectorList = (String)params.get(DiscoveryGroupConstants.STATIC_CONNECTOR_NAMES_NAME);
+ if (connectorList != null)
+ {
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ StringTokenizer token = new StringTokenizer(connectorList, ",", false);
+ while (token.hasMoreElements())
+ {
+ connectors.add(mainConfig.getConnectorConfigurations().get(token.nextElement()));
+ }
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME, connectors);
+ }
+
+ DiscoveryGroupConfiguration config =
+ new DiscoveryGroupConfiguration(serverLocatorClassName, clusterConnectorClassName, params, name);
+
if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name))
{
FileConfigurationParser.log.warn("There is already a discovery group with name " + name +
- " deployed. This one will not be deployed.");
+ " deployed. This one will not be deployed.");
return;
}
@@ -1004,7 +1083,7 @@
"max-hops",
ConfigurationImpl.DEFAULT_CLUSTER_MAX_HOPS,
Validators.GE_ZERO);
-
+
long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(e, "check-period",
ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD, Validators.GT_ZERO) ;
@@ -1016,14 +1095,14 @@
"retry-interval",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
Validators.GT_ZERO);
-
+
long callTimeout = XMLConfigurationUtil.getLong(e, "call-timeout", HornetQClient.DEFAULT_CALL_TIMEOUT, Validators.GT_ZERO);
-
- double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier",
+
+ double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
-
+
long maxRetryInterval = XMLConfigurationUtil.getLong(e, "max-retry-interval", ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
-
+
int reconnectAttempts = XMLConfigurationUtil.getInteger(e, "reconnect-attempts", ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS, Validators.MINUS_ONE_OR_GE_ZERO);
@@ -1034,7 +1113,7 @@
String discoveryGroupName = null;
- List<String> staticConnectorNames = new ArrayList<String>();
+ List<TransportConfiguration> directConnections = null;
boolean allowDirectConnectionsOnly = false;
@@ -1047,57 +1126,39 @@
if (child.getNodeName().equals("discovery-group-ref"))
{
discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
- }
- else if (child.getNodeName().equals("static-connectors"))
- {
+
Node attr = child.getAttributes().getNamedItem("allow-direct-connections-only");
if (attr != null)
{
allowDirectConnectionsOnly = "true".equalsIgnoreCase(attr.getNodeValue()) || allowDirectConnectionsOnly;
+ directConnections =
+ (List<TransportConfiguration>)mainConfig.getDiscoveryGroupConfigurations()
+ .get(discoveryGroupName)
+ .getParams()
+ .get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
}
- getStaticConnectors(staticConnectorNames, child);
}
}
ClusterConnectionConfiguration config;
- if (discoveryGroupName == null)
- {
- config = new ClusterConnectionConfiguration(name,
- address,
- connectorName,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- callTimeout,
- duplicateDetection,
- forwardWhenNoConsumers,
- maxHops,
- confirmationWindowSize,
- staticConnectorNames,
- allowDirectConnectionsOnly);
- }
- else
- {
- config = new ClusterConnectionConfiguration(name,
- address,
- connectorName,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- callTimeout,
- duplicateDetection,
- forwardWhenNoConsumers,
- maxHops,
- confirmationWindowSize,
- discoveryGroupName);
- }
+ config = new ClusterConnectionConfiguration(name,
+ address,
+ connectorName,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ callTimeout,
+ duplicateDetection,
+ forwardWhenNoConsumers,
+ maxHops,
+ confirmationWindowSize,
+ mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName),
+ allowDirectConnectionsOnly,
+ directConnections);
mainConfig.getClusterConfigurations().add(config);
}
@@ -1147,10 +1208,10 @@
long connectionTTL = XMLConfigurationUtil.getLong(brNode, "connection-ttl",
HornetQClient.DEFAULT_CONNECTION_TTL, Validators.GT_ZERO) ;
-
+
long maxRetryInterval = XMLConfigurationUtil.getLong(brNode, "max-retry-interval", HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
-
+
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(brNode,
"retry-interval-multiplier",
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -15,17 +15,20 @@
import javax.management.MBeanOperationInfo;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.BroadcastGroupControl;
import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.BroadcastGroupConstants;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.json.JSONArray;
/**
* A BroadcastGroupControl
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
+ *
* Created 11 dec. 2008 17:09:04
*/
public class BroadcastGroupControlImpl extends AbstractControl implements BroadcastGroupControl
@@ -72,7 +75,9 @@
clearIO();
try
{
- return configuration.getBroadcastPeriod();
+ return ConfigurationHelper.getLongProperty(BroadcastGroupConstants.BROADCAST_PERIOD_NAME,
+ -1,
+ configuration.getParams());
}
finally
{
@@ -85,10 +90,10 @@
clearIO();
try
{
- Object[] ret = new Object[configuration.getConnectorInfos().size()];
+ Object[] ret = new Object[configuration.getConnectorList().size()];
int i = 0;
- for (String connector : configuration.getConnectorInfos())
+ for (TransportConfiguration connector : configuration.getConnectorList())
{
ret[i++] = connector;
}
@@ -108,7 +113,7 @@
{
JSONArray array = new JSONArray();
- for (String connector : configuration.getConnectorInfos())
+ for (TransportConfiguration connector : configuration.getConnectorList())
{
array.put(connector);
}
@@ -125,7 +130,9 @@
clearIO();
try
{
- return configuration.getGroupAddress();
+ return ConfigurationHelper.getStringProperty(BroadcastGroupConstants.GROUP_ADDRESS_NAME,
+ null,
+ configuration.getParams());
}
finally
{
@@ -138,7 +145,9 @@
clearIO();
try
{
- return configuration.getGroupPort();
+ return ConfigurationHelper.getIntProperty(BroadcastGroupConstants.GROUP_PORT_NAME,
+ -1,
+ configuration.getParams());
}
finally
{
@@ -151,7 +160,9 @@
clearIO();
try
{
- return configuration.getLocalBindPort();
+ return ConfigurationHelper.getIntProperty(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME,
+ -1,
+ configuration.getParams());
}
finally
{
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -18,10 +18,13 @@
import javax.management.MBeanOperationInfo;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ClusterConnectionControl;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.json.JSONArray;
/**
@@ -74,7 +77,7 @@
clearIO();
try
{
- return configuration.getDiscoveryGroupName();
+ return configuration.getDiscoveryGroupConfiguration().getName();
}
finally
{
@@ -143,14 +146,23 @@
clearIO();
try
{
- if (configuration.getStaticConnectors() == null)
+ List<TransportConfiguration> connectors =
+ (List<TransportConfiguration>)configuration
+ .getDiscoveryGroupConfiguration()
+ .getParams()
+ .get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
+ if (connectors == null)
{
return null;
}
- else
+
+ String[] array = new String[connectors.size()];
+
+ for (int i=0; i>connectors.size(); i++)
{
- return configuration.getStaticConnectors().toArray(new String[0]);
+ array[i] = connectors.get(i).toString();
}
+ return array;
}
finally
{
@@ -163,7 +175,11 @@
clearIO();
try
{
- List<String> connectors = configuration.getStaticConnectors();
+ List<TransportConfiguration> connectors =
+ (List<TransportConfiguration>)configuration
+ .getDiscoveryGroupConfiguration()
+ .getParams()
+ .get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
if (connectors == null)
{
@@ -172,9 +188,9 @@
JSONArray array = new JSONArray();
- for (String connector : connectors)
+ for (TransportConfiguration connector : connectors)
{
- array.put(connector);
+ array.put(connector.toString());
}
return array.toString();
}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/BroadcastGroup.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/BroadcastGroup.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/BroadcastGroup.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -13,6 +13,8 @@
package org.hornetq.core.server.cluster;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.management.NotificationService;
@@ -22,7 +24,7 @@
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
+ *
* Created 18 Nov 2008 09:29:45
*
*
@@ -42,4 +44,6 @@
void broadcastConnectors() throws Exception;
void activate();
+
+ void schedule(ScheduledExecutorService executor);
}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -16,19 +16,24 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.BroadcastGroupConstants;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -36,7 +41,7 @@
* A BroadcastGroupImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 15 Nov 2008 09:45:32
*
*/
@@ -48,17 +53,11 @@
private final String name;
- private final InetAddress localAddress;
+ private final BroadcastGroupConfiguration broadcastGroupConfiguration;
- private final int localPort;
-
- private final InetAddress groupAddress;
-
- private final int groupPort;
-
private DatagramSocket socket;
- private final List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ private final List<TransportConfiguration> connectors;
private boolean started;
@@ -77,26 +76,19 @@
*/
public BroadcastGroupImpl(final String nodeID,
final String name,
- final InetAddress localAddress,
- final int localPort,
- final InetAddress groupAddress,
- final int groupPort,
- final boolean active) throws Exception
+ final boolean active,
+ final BroadcastGroupConfiguration config) throws Exception
{
this.nodeID = nodeID;
this.name = name;
- this.localAddress = localAddress;
+ this.active = active;
- this.localPort = localPort;
+ this.broadcastGroupConfiguration = config;
- this.groupAddress = groupAddress;
+ this.connectors = config.getConnectorList();
- this.groupPort = groupPort;
-
- this.active = active;
-
uniqueID = UUIDGenerator.getInstance().generateStringUUID();
}
@@ -112,6 +104,17 @@
return;
}
+ Map<String, Object> params = this.broadcastGroupConfiguration.getParams();
+ int localPort = ConfigurationHelper.getIntProperty(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME, -1, params);
+ String localAddr =
+ ConfigurationHelper.getStringProperty(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
+
+ InetAddress localAddress = null;
+ if (localAddr != null)
+ {
+ localAddress = InetAddress.getByName(localAddr);
+ }
+
if (localPort != -1)
{
socket = new DatagramSocket(localPort, localAddress);
@@ -120,7 +123,7 @@
{
if (localAddress != null)
{
- log.warn("local-bind-address specified for broadcast group but no local-bind-port specified so socket will NOT be bound " +
+ log.warn("local-bind-address specified for broadcast group but no local-bind-port specified so socket will NOT be bound " +
"to a local address/port");
}
socket = new DatagramSocket();
@@ -222,8 +225,12 @@
byte[] data = buff.toByteBuffer().array();
- DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
+ Map<String, Object> params = broadcastGroupConfiguration.getParams();
+ Integer groupPort = (Integer)params.get(BroadcastGroupConstants.GROUP_PORT_NAME);
+ InetAddress groupAddr = (InetAddress)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
+ DatagramPacket packet = new DatagramPacket(data, data.length, groupAddr, groupPort);
+
socket.send(packet);
}
@@ -244,9 +251,13 @@
}
}
- public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
+ public void schedule(ScheduledExecutorService scheduler)
{
- this.future = future;
+ Map<String,Object> params = broadcastGroupConfiguration.getParams();
+
+ this.future = scheduler.scheduleWithFixedDelay(this,
+ 0L,
+ Long.parseLong((String)params.get(BroadcastGroupConstants.BROADCAST_PERIOD_NAME)),
+ TimeUnit.MILLISECONDS);
}
-
}
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -18,12 +18,12 @@
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.Arrays;
+import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -39,8 +39,8 @@
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.AfterConnectInternalListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
@@ -77,7 +77,7 @@
*/
public class ClusterConnectionImpl implements ClusterConnection, AfterConnectInternalListener
{
- private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
+ static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
private static final boolean isTrace = log.isTraceEnabled();
@@ -139,7 +139,7 @@
private final boolean allowDirectConnectionsOnly;
- private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
+ private final List<TransportConfiguration> allowableConnections;
private final ClusterManagerInternal manager;
@@ -147,120 +147,11 @@
// Stuff that used to be on the ClusterManager
- private final Topology topology = new Topology(this);
+ final Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
- public ClusterConnectionImpl(final ClusterManagerInternal manager,
- final TransportConfiguration[] tcConfigs,
- final TransportConfiguration connector,
- final SimpleString name,
- final SimpleString address,
- final long clientFailureCheckPeriod,
- final long connectionTTL,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final long maxRetryInterval,
- final int reconnectAttempts,
- final long callTimeout,
- final boolean useDuplicateDetection,
- final boolean routeWhenNoConsumers,
- final int confirmationWindowSize,
- final ExecutorFactory executorFactory,
- final HornetQServer server,
- final PostOffice postOffice,
- final ManagementService managementService,
- final ScheduledExecutorService scheduledExecutor,
- final int maxHops,
- final UUID nodeUUID,
- final boolean backup,
- final String clusterUser,
- final String clusterPassword,
- final boolean allowDirectConnectionsOnly) throws Exception
- {
-
- if (nodeUUID == null)
- {
- throw new IllegalArgumentException("node id is null");
- }
-
- this.nodeUUID = nodeUUID;
-
- this.connector = connector;
-
- this.name = name;
-
- this.address = address;
-
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-
- this.connectionTTL = connectionTTL;
-
- this.retryInterval = retryInterval;
-
- this.retryIntervalMultiplier = retryIntervalMultiplier;
-
- this.maxRetryInterval = maxRetryInterval;
-
- this.reconnectAttempts = reconnectAttempts;
-
- this.useDuplicateDetection = useDuplicateDetection;
-
- this.routeWhenNoConsumers = routeWhenNoConsumers;
-
- this.confirmationWindowSize = confirmationWindowSize;
-
- this.executorFactory = executorFactory;
-
- this.executor = executorFactory.getExecutor();
-
- this.topology.setExecutor(executor);
-
- this.server = server;
-
- this.postOffice = postOffice;
-
- this.managementService = managementService;
-
- this.scheduledExecutor = scheduledExecutor;
-
- this.maxHops = maxHops;
-
- this.backup = backup;
-
- this.clusterUser = clusterUser;
-
- this.clusterPassword = clusterPassword;
-
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-
- this.manager = manager;
-
- this.callTimeout = callTimeout;
-
- clusterConnector = new StaticClusterConnector(tcConfigs);
-
- backupServerLocator = clusterConnector.createServerLocator(false);
-
- if (backupServerLocator != null)
- {
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
- }
-
- if (tcConfigs != null && tcConfigs.length > 0)
- {
- // a cluster connection will connect to other nodes only if they are directly connected
- // through a static list of connectors or broadcasting using UDP.
- if (allowDirectConnectionsOnly)
- {
- allowableConnections.addAll(Arrays.asList(tcConfigs));
- }
- }
-
- }
-
public ClusterConnectionImpl(final ClusterManagerImpl manager,
DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
@@ -286,7 +177,8 @@
final boolean backup,
final String clusterUser,
final String clusterPassword,
- final boolean allowDirectConnectionsOnly) throws Exception
+ final boolean allowDirectConnectionsOnly,
+ final List<TransportConfiguration> allowableConnections) throws Exception
{
if (nodeUUID == null)
@@ -346,8 +238,14 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
- clusterConnector = new DiscoveryClusterConnector(dg);
+ this.allowableConnections = allowableConnections;
+ String className = dg.getClusterConnectorClassName();
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(DiscoveryGroupConfiguration.class);
+ clusterConnector = (ClusterConnector)constructor.newInstance(dg);
+
backupServerLocator = clusterConnector.createServerLocator(false);
if (backupServerLocator != null)
@@ -846,7 +744,7 @@
final Queue queue,
final boolean start) throws Exception
{
- final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false, connector);
+ final ServerLocatorInternal targetLocator = new StaticServerLocatorImpl(topology, false, connector);
String nodeId;
@@ -1502,7 +1400,7 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
"[nodeUUID=" + nodeUUID +
", connector=" +
connector +
@@ -1530,64 +1428,4 @@
return str.toString();
}
-
- interface ClusterConnector
- {
- ServerLocatorInternal createServerLocator(boolean includeTopology);
- }
-
- private class StaticClusterConnector implements ClusterConnector
- {
- private final TransportConfiguration[] tcConfigs;
-
- public StaticClusterConnector(TransportConfiguration[] tcConfigs)
- {
- this.tcConfigs = tcConfigs;
- }
-
- public ServerLocatorInternal createServerLocator(boolean includeTopology)
- {
- if (tcConfigs != null && tcConfigs.length > 0)
- {
- if (log.isDebugEnabled())
- {
- log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
- }
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
- locator.setClusterConnection(true);
- return locator;
- }
- else
- {
- return null;
- }
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString()
- {
- return "StaticClusterConnector [tcConfigs=" + Arrays.toString(tcConfigs) + "]";
- }
-
- }
-
- private class DiscoveryClusterConnector implements ClusterConnector
- {
- private final DiscoveryGroupConfiguration dg;
-
- public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg)
- {
- this.dg = dg;
- }
-
- public ServerLocatorInternal createServerLocator(boolean includeTopology)
- {
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, dg);
- return locator;
-
- }
- }
}
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnector.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnector.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnector.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+
+/**
+ * A ClusterConnector
+ */
+public interface ClusterConnector
+{
+ ServerLocatorInternal createServerLocator(boolean includeTopology);
+}
\ No newline at end of file
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -13,13 +13,10 @@
package org.hornetq.core.server.cluster.impl;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.util.Arrays;
+import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -27,7 +24,6 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.SimpleString;
@@ -444,46 +440,23 @@
ServerLocatorInternal serverLocator;
- if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
- if (discoveryGroupConfiguration == null)
- {
- ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
+ if (discoveryGroupConfiguration == null)
+ {
+ ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
"'. The bridge will not be deployed.");
- return;
- }
+ return;
+ }
- if (config.isHA())
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
- }
- else
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
- }
-
+ if (config.isHA())
+ {
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
}
else
{
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
- if (tcConfigs == null)
- {
- return;
- }
-
- if (config.isHA())
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
- }
- else
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- }
-
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
}
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
@@ -624,91 +597,41 @@
ClusterConnectionImpl clusterConnection;
- if (config.getDiscoveryGroupName() != null)
+ if (log.isDebugEnabled())
{
- DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
- .get(config.getDiscoveryGroupName());
-
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
- "'. The cluster connection will not be deployed.");
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug(this + " Starting a Discovery Group Cluster Connection, name=" +
- config.getDiscoveryGroupName() +
- ", dg=" +
- dg);
- }
-
- clusterConnection = new ClusterConnectionImpl(this,
- dg,
- connector,
- new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getClientFailureCheckPeriod(),
- config.getConnectionTTL(),
- config.getRetryInterval(),
- config.getRetryIntervalMultiplier(),
- config.getMaxRetryInterval(),
- config.getReconnectAttempts(),
- config.getCallTimeout(),
- config.isDuplicateDetection(),
- config.isForwardWhenNoConsumers(),
- config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- config.getMaxHops(),
- nodeUUID,
- backup,
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- config.isAllowDirectConnectionsOnly());
+ log.debug(this + " Starting a Discovery Group Cluster Connection, name=" +
+ config.getDiscoveryGroupConfiguration().getName() + ", dg=" + config.getDiscoveryGroupConfiguration());
}
- else
- {
- TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
- : null;
- if (log.isDebugEnabled())
- {
- log.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
- }
+ clusterConnection =
+ new ClusterConnectionImpl(this,
+ config.getDiscoveryGroupConfiguration(),
+ connector,
+ new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getClientFailureCheckPeriod(),
+ config.getConnectionTTL(),
+ config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getMaxRetryInterval(),
+ config.getReconnectAttempts(),
+ config.getCallTimeout(),
+ config.isDuplicateDetection(),
+ config.isForwardWhenNoConsumers(),
+ config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+ config.getMaxHops(),
+ nodeUUID,
+ backup,
+ server.getConfiguration().getClusterUser(),
+ server.getConfiguration().getClusterPassword(),
+ config.isAllowDirectConnectionsOnly(),
+ config.getAllowedConnectors());
- clusterConnection = new ClusterConnectionImpl(this,
- tcConfigs,
- connector,
- new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getClientFailureCheckPeriod(),
- config.getConnectionTTL(),
- config.getRetryInterval(),
- config.getRetryIntervalMultiplier(),
- config.getMaxRetryInterval(),
- config.getReconnectAttempts(),
- config.getCallTimeout(),
- config.isDuplicateDetection(),
- config.isForwardWhenNoConsumers(),
- config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- config.getMaxHops(),
- nodeUUID,
- backup,
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- config.isAllowDirectConnectionsOnly());
- }
-
if (defaultClusterConnection == null)
{
defaultClusterConnection = clusterConnection;
@@ -756,43 +679,24 @@
return;
}
- InetAddress localAddress = null;
- if (config.getLocalBindAddress() != null)
- {
- localAddress = InetAddress.getByName(config.getLocalBindAddress());
- }
+ String className = config.getBroadcastGroupClassName();
- InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor =
+ clazz.getConstructor(String.class, String.class, boolean.class, BroadcastGroupConfiguration.class);
+ BroadcastGroup group =
+ (BroadcastGroup)constructor.newInstance(nodeUUID.toString(), config.getName(), !backup, config);
- BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
- config.getName(),
- localAddress,
- config.getLocalBindPort(),
- groupAddress,
- config.getGroupPort(),
- !backup);
-
- for (String connectorInfo : config.getConnectorInfos())
+ if (group.size() == 0)
{
- TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorInfo);
-
- if (connector == null)
- {
- logWarnNoConnector(config.getName(), connectorInfo);
-
- return;
- }
-
- group.addConnector(connector);
+ ClusterManagerImpl.log.warn("There is no connector deployed for the broadcast group with name '" +
+ group.getName() + "'. That will not be deployed.");
+ return;
}
- ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
- 0L,
- config.getBroadcastPeriod(),
- MILLISECONDS);
+ group.schedule(scheduledExecutor);
- group.setScheduledFuture(future);
-
broadcastGroups.put(config.getName(), group);
managementService.registerBroadcastGroup(group, config);
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/StaticClusterConnectorImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.DiscoveryGroupConstants;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
+
+/**
+ * A StaticClusterConnectorImpl
+ */
+class StaticClusterConnectorImpl implements ClusterConnector
+{
+ private final ClusterConnectionImpl clusterConnectionImpl;
+ private final List<TransportConfiguration> tcConfigs;
+
+ public StaticClusterConnectorImpl(ClusterConnectionImpl clusterConnectionImpl, DiscoveryGroupConfiguration dg)
+ {
+ this.clusterConnectionImpl = clusterConnectionImpl;
+ this.tcConfigs = (List<TransportConfiguration>)dg.getParams().get(DiscoveryGroupConstants.STATIC_CONNECTOR_CONFIG_LIST_NAME);
+ }
+
+ @Override
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
+ {
+ if (tcConfigs != null && tcConfigs.size() > 0)
+ {
+ if (ClusterConnectionImpl.log.isDebugEnabled())
+ {
+ ClusterConnectionImpl.log.debug(this.clusterConnectionImpl + "Creating a serverLocator for " + tcConfigs);
+ }
+ StaticServerLocatorImpl locator = new StaticServerLocatorImpl(includeTopology ? this.clusterConnectionImpl.topology : null,
+ true,
+ tcConfigs.toArray(new TransportConfiguration[0]));
+ locator.setClusterConnection(true);
+ return locator;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "StaticClusterConnector [tcConfigs=" + tcConfigs + "]";
+ }
+
+}
\ No newline at end of file
Added: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/UDPDiscoveryClusterConnectorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/UDPDiscoveryClusterConnectorImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/UDPDiscoveryClusterConnectorImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.UDPServerLocatorImpl;
+
+/**
+ * A UDPDiscoveryClusterConnectorImpl
+ */
+class UDPDiscoveryClusterConnectorImpl implements ClusterConnector
+{
+ private final ClusterConnectionImpl clusterConnectionImpl;
+ private final DiscoveryGroupConfiguration dg;
+
+ public UDPDiscoveryClusterConnectorImpl(ClusterConnectionImpl clusterConnectionImpl, DiscoveryGroupConfiguration dg)
+ {
+ this.clusterConnectionImpl = clusterConnectionImpl;
+ this.dg = dg;
+ }
+
+ @Override
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
+ {
+ UDPServerLocatorImpl locator = new UDPServerLocatorImpl(includeTopology?this.clusterConnectionImpl.topology:null, true, dg);
+ return locator;
+
+ }
+}
\ No newline at end of file
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -10,13 +10,14 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.api.core.DiscoveryGroupConstants;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
/**
* Manages a quorum of servers used to determine whether a given server is running or not.
@@ -93,7 +94,8 @@
if (targetServerID.equals(pair.getKey()))
continue;
TransportConfiguration serverTC = pair.getValue().getA();
- ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
+ StaticServerLocatorImpl locator =
+ (StaticServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
locatorsList.add(locator);
pool.submit(new ServerConnect(latch, pingCount, locator));
}
@@ -130,11 +132,11 @@
private static class ServerConnect implements Runnable
{
- private final ServerLocatorImpl locator;
+ private final StaticServerLocatorImpl locator;
private final CountDownLatch latch;
private final AtomicInteger count;
- public ServerConnect(CountDownLatch latch, AtomicInteger count, ServerLocatorImpl serverLocator)
+ public ServerConnect(CountDownLatch latch, AtomicInteger count, StaticServerLocatorImpl serverLocator)
{
locator = serverLocator;
this.latch = latch;
@@ -145,7 +147,9 @@
public void run()
{
locator.setReconnectAttempts(-1);
- locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
+ locator.getDiscoveryGroupConfiguration()
+ .getParams()
+ .put(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, DISCOVERY_TIMEOUT);
final ClientSessionFactory liveServerSessionFactory;
try
Modified: branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd 2011-11-22 17:23:39 UTC (rev 11740)
@@ -218,18 +218,12 @@
<xsd:element name="broadcast-group">
<xsd:complexType>
<xsd:sequence>
- <xsd:element maxOccurs="1" minOccurs="0" ref="local-bind-address">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" ref="local-bind-port">
+ <xsd:element maxOccurs="1" minOccurs="1" name="broadcast-type" type="broadcastType">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="1" ref="group-address">
+ <xsd:element maxOccurs="unbounded" minOccurs="0" name="connector-ref" type="xsd:string">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="1" ref="group-port">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" ref="broadcast-period">
- </xsd:element>
- <xsd:element maxOccurs="unbounded" minOccurs="0" name="connector-ref" type="xsd:string">
- </xsd:element>
+ <xsd:element maxOccurs="unbounded" minOccurs="0" name="param" type="paramType">
+ </xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:ID" use="required"/>
</xsd:complexType>
@@ -242,16 +236,10 @@
<xsd:element name="discovery-group">
<xsd:complexType>
<xsd:sequence>
- <xsd:element maxOccurs="1" minOccurs="0" ref="local-bind-address">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="1" ref="group-address">
+ <xsd:element maxOccurs="1" minOccurs="1" name="discovery-type" type="discoveryType">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="1" ref="group-port">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" ref="refresh-timeout">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" ref="initial-wait-timeout">
- </xsd:element>
+ <xsd:element maxOccurs="unbounded" minOccurs="0" name="param" type="paramType">
+ </xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:ID" use="required"/>
</xsd:complexType>
@@ -329,21 +317,12 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="password" type="xsd:string">
</xsd:element>
- <xsd:choice>
- <xsd:element maxOccurs="1" minOccurs="1" name="static-connectors">
- <xsd:complexType>
- <xsd:sequence>
- <xsd:element maxOccurs="unbounded" minOccurs="1" name="connector-ref" type="xsd:string"/>
- </xsd:sequence>
- </xsd:complexType>
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="1" name="discovery-group-ref">
- <xsd:complexType>
- <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
- </xsd:attribute>
- </xsd:complexType>
- </xsd:element>
- </xsd:choice>
+ <xsd:element maxOccurs="1" minOccurs="1" name="discovery-group-ref">
+ <xsd:complexType>
+ <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
+ </xsd:attribute>
+ </xsd:complexType>
+ </xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required"/>
</xsd:complexType>
@@ -365,22 +344,14 @@
<xsd:element maxOccurs="1" minOccurs="0" name="confirmation-window-size" type="xsd:int">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="call-timeout" type="xsd:long"/>
- <xsd:choice>
- <xsd:element maxOccurs="1" minOccurs="0" name="static-connectors">
- <xsd:complexType>
- <xsd:sequence>
- <xsd:element maxOccurs="unbounded" minOccurs="0" name="connector-ref" type="xsd:string"/>
- </xsd:sequence>
- <xsd:attribute name="allow-direct-connections-only" type="xsd:boolean" use="optional"/>
- </xsd:complexType>
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="discovery-group-ref">
- <xsd:complexType>
- <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
- </xsd:attribute>
- </xsd:complexType>
- </xsd:element>
- </xsd:choice>
+ <xsd:element maxOccurs="1" minOccurs="1" name="discovery-group-ref">
+ <xsd:complexType>
+ <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
+ </xsd:attribute>
+ <xsd:attribute name="allow-direct-connections-only" type="xsd:boolean" use="optional">
+ </xsd:attribute>
+ </xsd:complexType>
+ </xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required"/>
</xsd:complexType>
@@ -542,4 +513,18 @@
<xsd:attribute name="name" type="xsd:string" use="optional"/>
</xsd:complexType>
+ <xsd:simpleType name="discoveryType">
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="STATIC"/>
+ <xsd:enumeration value="UDP"/>
+ <xsd:enumeration value="JGROUPS"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+
+ <xsd:simpleType name="broadcastType">
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="UDP"/>
+ <xsd:enumeration value="JGROUPS"/>
+ </xsd:restriction>
+ </xsd:simpleType>
</xsd:schema>
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/pom.xml
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/pom.xml (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/pom.xml 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,33 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-pom</artifactId>
+ <version>2.2.3-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>hornetq-jgroups-discovery</artifactId>
+ <packaging>jar</packaging>
+ <name>HornetQ JGroups Cluster Discovery Integration</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>2.12.2.Final</version>
+ </dependency>
+ </dependencies>
+
+</project>
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+/**
+ * A BroadcastGroupConstants
+ *
+ * @author <a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>
+ */
+public class BroadcastGroupConstants
+{
+ public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-file";
+ public static final String BROADCAST_PERIOD_NAME = "broadcast-period";
+ public static final String JGROUPS_CHANNEL_NAME_NAME = "jgroups-channel-name";
+
+ public static final String DEFAULT_JGROUPS_CHANNEL_NAME = "hornetq-jgroups-channel";
+}
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>"
+ */
+public class DiscoveryGroupConstants
+{
+ public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-filename";
+ public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+ public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+ public static final String JGROUPS_CHANNEL_NAME_NAME = "jgroups-channel-name";
+
+ public static final String DEFAULT_JGROUPS_CHANNEL_NAME = "hornetq-jgroups-channel";
+}
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryClusterConnectorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryClusterConnectorImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryClusterConnectorImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
+import org.hornetq.core.server.cluster.impl.ClusterConnector;
+/**
+ * A JGroupsDiscoveryClusterConnectorImpl
+ *
+ * @author <a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>
+ */
+class JGroupsDiscoveryClusterConnectorImpl implements ClusterConnector
+{
+ private final ClusterConnectionImpl clusterConnectionImpl;
+ private final DiscoveryGroupConfiguration dg;
+
+ public JGroupsDiscoveryClusterConnectorImpl(ClusterConnectionImpl clusterConnectionImpl, DiscoveryGroupConfiguration dg)
+ {
+ this.clusterConnectionImpl = clusterConnectionImpl;
+ this.dg = dg;
+ }
+
+ @Override
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
+ {
+ JGroupsServerLocatorImpl locator = new JGroupsServerLocatorImpl(includeTopology?this.clusterConnectionImpl.getTopology():null, true, dg);
+ return locator;
+
+ }
+}
\ No newline at end of file
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.TypedProperties;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.ReceiverAdapter;
+import org.jgroups.util.Util;
+
+/**
+ * A JGroupsDiscoveryGroupImpl
+ *
+ * @author <a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>
+ */
+public class JGroupsDiscoveryGroupImpl extends ReceiverAdapter implements DiscoveryGroup
+{
+ private static final Logger log = Logger.getLogger(JGroupsDiscoveryGroupImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
+
+ private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+ private final String name;
+
+ private boolean received;
+
+ private final Object waitLock = new Object();
+
+ private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<String, DiscoveryEntry>();
+
+ private final long timeout;
+
+ private volatile boolean started;
+
+ private final String nodeID;
+
+ private final Map<String, String> uniqueIDMap = new HashMap<String, String>();
+
+ private NotificationService notificationService;
+
+ private String jgroupsChannelName;
+
+ private URL configURL;
+
+private JChannel discoveryChannel;
+
+ public JGroupsDiscoveryGroupImpl(final String nodeID,
+ final String name,
+ final String channelName,
+ final URL confURL,
+ final long timeout) throws Exception
+ {
+ this.nodeID = nodeID;
+ this.name = name;
+ this.jgroupsChannelName = channelName;
+ this.configURL = confURL;
+ this.timeout = timeout;
+ }
+
+ public void setNotificationService(final NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ try
+ {
+ discoveryChannel = new JChannel(configURL);
+
+ discoveryChannel.setReceiver(this);
+
+ discoveryChannel.connect(this.jgroupsChannelName);
+ }
+ catch(Exception e)
+ {
+ log.error("Failed to join jgroups channel", e);
+ return;
+ }
+
+ started = true;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+
+ Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props);
+
+ notificationService.sendNotification(notification);
+ }
+ }
+
+ public void stop()
+ {
+ synchronized (this)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ started = false;
+ }
+
+ synchronized (waitLock)
+ {
+ waitLock.notifyAll();
+ }
+
+ try
+ {
+ Util.shutdown(discoveryChannel);
+ } catch (Exception e)
+ {
+ JGroupsDiscoveryGroupImpl.log.warn("unable to shutdown JGroups Channel", e);
+ }
+
+ this.discoveryChannel.close();
+
+ this.discoveryChannel = null;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ JGroupsDiscoveryGroupImpl.log.warn("unable to send notification when discovery group is stopped", e);
+ }
+ }
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public synchronized List<DiscoveryEntry> getDiscoveryEntries()
+ {
+ List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>();
+
+ list.addAll(connectors.values());
+
+ return list;
+ }
+
+ public boolean waitForBroadcast(final long timeout)
+ {
+ synchronized (waitLock)
+ {
+ long start = System.currentTimeMillis();
+
+ long toWait = timeout;
+
+ while (started && !received && (toWait > 0 || timeout == 0))
+ {
+ try
+ {
+ waitLock.wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ if (timeout != 0)
+ {
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+ }
+
+ boolean ret = received;
+
+ received = false;
+
+ return ret;
+ }
+ }
+
+ @Override
+ public void receive(Message msg)
+ {
+ if(!started)
+ {
+ return;
+ }
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(msg.getBuffer());
+
+ String originatingNodeID = buffer.readString();
+
+ String uniqueID = buffer.readString();
+
+ checkUniqueID(originatingNodeID, uniqueID);
+
+ if (nodeID.equals(originatingNodeID))
+ {
+ if (checkExpiration())
+ {
+ callListeners();
+ }
+
+ // Ignore traffic from own node
+ return;
+ }
+
+ int size = buffer.readInt();
+
+ boolean changed = false;
+
+ synchronized (this)
+ {
+ for (int i = 0; i < size; i++)
+ {
+ TransportConfiguration connector = new TransportConfiguration();
+
+ connector.decode(buffer);
+
+ DiscoveryEntry entry = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
+
+ DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
+
+ if (oldVal == null)
+ {
+ changed = true;
+ }
+ }
+
+ changed = changed || checkExpiration();
+ }
+
+ if (changed)
+ {
+ callListeners();
+ }
+
+ synchronized (waitLock)
+ {
+ received = true;
+
+ waitLock.notify();
+ }
+ }
+
+ /*
+ * This is a sanity check to catch any cases where two different nodes are broadcasting the same node id either
+ * due to misconfiguration or problems in failover
+ */
+ private void checkUniqueID(final String originatingNodeID, final String uniqueID)
+ {
+ String currentUniqueID = uniqueIDMap.get(originatingNodeID);
+
+ if (currentUniqueID == null)
+ {
+ uniqueIDMap.put(originatingNodeID, uniqueID);
+ }
+ else
+ {
+ if (!currentUniqueID.equals(uniqueID))
+ {
+ log.warn("There are more than one servers on the network broadcasting the same node id. "
+ + "You will see this message exactly once (per node) if a node is restarted, in which case it can be safely "
+ + "ignored. But if it is logged continuously it means you really do have more than one node on the same network "
+ + "active concurrently with the same node id. This could occur if you have a backup node active at the same time as "
+ + "its live node. nodeID=" + originatingNodeID);
+ uniqueIDMap.put(originatingNodeID, uniqueID);
+ }
+ }
+ }
+
+ public synchronized void registerListener(final DiscoveryListener listener)
+ {
+ listeners.add(listener);
+
+ if (!connectors.isEmpty())
+ {
+ listener.connectorsChanged();
+ }
+ }
+
+ public synchronized void unregisterListener(final DiscoveryListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ private void callListeners()
+ {
+ for (DiscoveryListener listener : listeners)
+ {
+ try
+ {
+ listener.connectorsChanged();
+ }
+ catch (Throwable t)
+ {
+ // Catch it so exception doesn't prevent other listeners from running
+ JGroupsDiscoveryGroupImpl.log.error("Failed to call discovery listener", t);
+ }
+ }
+ }
+
+ private boolean checkExpiration()
+ {
+ boolean changed = false;
+ long now = System.currentTimeMillis();
+
+ Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
+
+ // Weed out any expired connectors
+
+ while (iter.hasNext())
+ {
+ Map.Entry<String, DiscoveryEntry> entry = iter.next();
+
+ if (entry.getValue().getLastUpdate() + timeout <= now)
+ {
+ if (isTrace)
+ {
+ log.trace("Timed out node on discovery:" + entry.getValue());
+ }
+ iter.remove();
+
+ changed = true;
+ }
+ }
+
+ return changed;
+ }
+
+}
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-11-22 17:23:39 UTC (rev 11740)
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.AbstractServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A JGroupsServerLocatorImpl
+ *
+ * @author <a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>
+ */
+public class JGroupsServerLocatorImpl extends AbstractServerLocator
+{
+ private static final long serialVersionUID = -1086825204860145543L;
+
+ private static final Logger log = Logger.getLogger(JGroupsServerLocatorImpl.class);
+
+ private String discoveryGroupName;
+
+ private long refreshTimeout;
+
+ private long initialWaitTimeout;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private final Exception e = new Exception();
+
+ private String jgroupsConfigurationFileName;
+
+ private String jgroupsChannelName;
+
+ @Override
+ protected synchronized void initialiseInternal() throws Exception
+ {
+ this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+ this.jgroupsChannelName =
+ ConfigurationHelper.getStringProperty(
+ DiscoveryGroupConstants.JGROUPS_CHANNEL_NAME_NAME,
+ DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME,
+ params);
+
+ this.initialWaitTimeout =
+ ConfigurationHelper.getLongProperty(
+ DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME,
+ HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
+ params);
+
+ this.refreshTimeout =
+ ConfigurationHelper.getLongProperty(
+ DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME,
+ ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
+ params);
+
+ this.jgroupsConfigurationFileName =
+ ConfigurationHelper.getStringProperty(
+ DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME,
+ null,
+ params);
+
+ this.discoveryGroup =
+ new JGroupsDiscoveryGroupImpl(
+ getNodeID(),
+ this.discoveryGroupName,
+ this.jgroupsChannelName,
+ Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName),
+ this.refreshTimeout);
+
+ discoveryGroup.registerListener(this);
+
+ discoveryGroup.start();
+ }
+
+ private JGroupsServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration,
+ final TransportConfiguration[] transportConfigs)
+ {
+ super(topology, useHA, discoveryGroupConfiguration, transportConfigs);
+
+ e.fillInStackTrace();
+ }
+
+ /**
+ * Create a JGroupsServerLocatorImpl using JGroups discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public JGroupsServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(useHA ? new Topology(null) : null, useHA, groupConfiguration, null);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ getTopology().setOwner(this);
+ }
+ }
+
+ /**
+ * Create a JgroupsServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public JGroupsServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
+ {
+ this(useHA ? new Topology(null) : null, useHA, null, transportConfigs);
+ if (useHA)
+ {
+ // We only set the owner at where the Topology was created.
+ // For that reason we can't set it at the main constructor
+ getTopology().setOwner(this);
+ }
+ }
+
+ /**
+ * Create a ServerLocatorImpl using JGroups discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public JGroupsServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(topology, useHA, groupConfiguration, null);
+
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public JGroupsServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final TransportConfiguration... transportConfigs)
+ {
+ this(topology, useHA, null, transportConfigs);
+ }
+
+ @Override
+ public ClientSessionFactoryInternal connect() throws Exception
+ {
+ return (ClientSessionFactoryInternal)createSessionFactory();
+ }
+
+ @Override
+ protected void doCloseInternal()
+ {
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
+ }
+
+ @Override
+ protected void waitInitialDiscovery() throws Exception
+ {
+ // Wait for an initial broadcast to give us at least one node in the cluster
+ long timeout = isClusterConnection() ? 0 : initialWaitTimeout;
+ boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial broadcast from cluster");
+ }
+ }
+
+}
Modified: branches/HORNETQ-316/pom.xml
===================================================================
--- branches/HORNETQ-316/pom.xml 2011-11-22 15:46:34 UTC (rev 11739)
+++ branches/HORNETQ-316/pom.xml 2011-11-22 17:23:39 UTC (rev 11740)
@@ -133,6 +133,7 @@
<module>hornetq-service-sar</module>
<module>hornetq-spring-integration</module>
<module>hornetq-twitter-integration</module>
+ <module>hornetq-jgroups-discovery</module>
</modules>
<dependencyManagement>
@@ -245,8 +246,13 @@
<!-- there is a new version of this JAR but it breaks our usage of it -->
<version>2.1.2</version>
</dependency>
- <!--needed to compile the spring support-->
<dependency>
+ <groupId>org.jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>2.12.2.Final</version>
+ </dependency>
+ <!--needed to compile the spring support-->
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>3.0.3.RELEASE</version>
13 years, 1 month
JBoss hornetq SVN: r11739 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/cluster and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-22 10:46:34 -0500 (Tue, 22 Nov 2011)
New Revision: 11739
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-7535 - Fixing topology for the colocated backup case
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-22 07:23:59 UTC (rev 11738)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-22 15:46:34 UTC (rev 11739)
@@ -177,17 +177,16 @@
return "Remote Proxy on channel " + Integer.toHexString(System.identityHashCode(this));
}
};
-
- final boolean isCC = msg.isClusterConnection();
+
if (acceptorUsed.getClusterConnection() != null)
{
- acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+ acceptorUsed.getClusterConnection().addClusterTopologyListener(listener);
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
{
- acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+ acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener);
}
});
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-11-22 07:23:59 UTC (rev 11738)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-11-22 15:46:34 UTC (rev 11739)
@@ -42,9 +42,9 @@
void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
- void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+ void addClusterTopologyListener(ClusterTopologyListener listener);
- void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+ void removeClusterTopologyListener(ClusterTopologyListener listener);
/**
* @return a Map of node ID and addresses
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-22 07:23:59 UTC (rev 11738)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-22 15:46:34 UTC (rev 11739)
@@ -246,7 +246,7 @@
clusterConnector = new StaticClusterConnector(tcConfigs);
- backupServerLocator = clusterConnector.createServerLocator(false);
+ backupServerLocator = clusterConnector.createServerLocator();
if (backupServerLocator != null)
{
@@ -356,7 +356,7 @@
clusterConnector = new DiscoveryClusterConnector(dg);
- backupServerLocator = clusterConnector.createServerLocator(true);
+ backupServerLocator = clusterConnector.createServerLocator();
if (backupServerLocator != null)
{
@@ -507,7 +507,7 @@
return topology.getMember(manager.getNodeId());
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topology.addClusterTopologyListener(listener);
@@ -515,7 +515,7 @@
topology.sendTopology(listener);
}
- public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
{
topology.removeClusterTopologyListener(listener);
}
@@ -642,7 +642,7 @@
backupServerLocator = null;
}
- serverLocator = clusterConnector.createServerLocator(true);
+ serverLocator = clusterConnector.createServerLocator();
if (serverLocator != null)
{
@@ -680,7 +680,7 @@
this.serverLocator.setRetryInterval(retryInterval);
}
- serverLocator.addClusterTopologyListener(this);
+ addClusterTopologyListener(this);
serverLocator.setAfterConnectionInternalListener(this);
@@ -1567,7 +1567,7 @@
interface ClusterConnector
{
- ServerLocatorInternal createServerLocator(boolean includeTopology);
+ ServerLocatorInternal createServerLocator();
}
private class StaticClusterConnector implements ClusterConnector
@@ -1579,7 +1579,7 @@
this.tcConfigs = tcConfigs;
}
- public ServerLocatorInternal createServerLocator(boolean includeTopology)
+ public ServerLocatorInternal createServerLocator()
{
if (tcConfigs != null && tcConfigs.length > 0)
{
@@ -1587,7 +1587,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
return locator;
}
@@ -1617,9 +1617,9 @@
this.dg = dg;
}
- public ServerLocatorInternal createServerLocator(boolean includeTopology)
+ public ServerLocatorInternal createServerLocator()
{
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, dg);
return locator;
}
13 years, 1 month
JBoss hornetq SVN: r11738 - in trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-11-22 02:23:59 -0500 (Tue, 22 Nov 2011)
New Revision: 11738
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
code refactor
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22 04:12:00 UTC (rev 11737)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22 07:23:59 UTC (rev 11738)
@@ -132,8 +132,6 @@
return receipt;
}
-
- public abstract StompFrame postprocess(StompFrame request);
public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
StompSubscription subscription, int deliveryCount) throws Exception;
@@ -278,4 +276,26 @@
return response;
}
+ public StompFrame postprocess(StompFrame request)
+ {
+ StompFrame response = null;
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ response.setNeedsDisconnect(true);
+ }
+ }
+ else
+ {
+ //request null, disconnect if so.
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ this.connection.disconnect();
+ }
+ }
+ return response;
+ }
+
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22 04:12:00 UTC (rev 11737)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22 07:23:59 UTC (rev 11738)
@@ -246,28 +246,5 @@
// TODO Auto-generated method stub
}
-
- @Override
- public StompFrame postprocess(StompFrame request)
- {
- StompFrame response = null;
- if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
- {
- response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- response.setNeedsDisconnect(true);
- }
- }
- else
- {
- //request null, disconnect if so.
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- this.connection.disconnect();
- }
- }
- return response;
- }
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22 04:12:00 UTC (rev 11737)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22 07:23:59 UTC (rev 11738)
@@ -166,29 +166,6 @@
}
return null;
}
-
- @Override
- public StompFrame postprocess(StompFrame request)
- {
- StompFrame response = null;
- if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
- {
- response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- response.setNeedsDisconnect(true);
- }
- }
- else
- {
- //request null, disconnect if so.
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- this.connection.disconnect();
- }
- }
- return response;
- }
@Override
public StompFrame onUnsubscribe(StompFrame request)
13 years, 1 month
JBoss hornetq SVN: r11737 - in trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-11-21 23:12:00 -0500 (Mon, 21 Nov 2011)
New Revision: 11737
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
code refactor
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22 02:33:35 UTC (rev 11736)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22 04:12:00 UTC (rev 11737)
@@ -12,13 +12,14 @@
*/
package org.hornetq.core.protocol.stomp;
-import java.io.UnsupportedEncodingException;
-
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
/**
*
@@ -113,12 +114,7 @@
public abstract StompFrame onConnect(StompFrame frame);
public abstract StompFrame onDisconnect(StompFrame frame);
- public abstract StompFrame onSend(StompFrame frame);
public abstract StompFrame onAck(StompFrame request);
- public abstract StompFrame onBegin(StompFrame frame);
- public abstract StompFrame onCommit(StompFrame request);
- public abstract StompFrame onAbort(StompFrame request);
- public abstract StompFrame onSubscribe(StompFrame request);
public abstract StompFrame onUnsubscribe(StompFrame request);
public abstract StompFrame onStomp(StompFrame request);
public abstract StompFrame onNack(StompFrame request);
@@ -145,5 +141,141 @@
public abstract StompFrame createStompFrame(String command);
public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException;
+
+ public StompFrame onCommit(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to COMMIT a transaction").getFrame();
+ return response;
+ }
+ try
+ {
+ connection.commitTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ public StompFrame onSend(StompFrame frame)
+ {
+ StompFrame response = null;
+ try
+ {
+ connection.validate();
+ String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+ long timestamp = System.currentTimeMillis();
+
+ ServerMessageImpl message = connection.createServerMessage();
+ message.setTimestamp(timestamp);
+ message.setAddress(SimpleString.toSimpleString(destination));
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = frame.getBody();
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
+
+ connection.sendServerMessage(message, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ catch (Exception e)
+ {
+ response = new HornetQStompException("Error handling send", e).getFrame();
+ }
+
+ return response;
+ }
+
+ public StompFrame onBegin(StompFrame frame)
+ {
+ StompFrame response = null;
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("Need a transaction id to begin").getFrame();
+ }
+ else
+ {
+ try
+ {
+ connection.beginTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ }
+ return response;
+ }
+
+ public StompFrame onAbort(StompFrame request)
+ {
+ StompFrame response = null;
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to ABORT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.abortTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ public StompFrame onSubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+
+ String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+ String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+ boolean noLocal = false;
+
+ if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+
+ try
+ {
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22 02:33:35 UTC (rev 11736)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22 04:12:00 UTC (rev 11737)
@@ -31,7 +31,6 @@
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
/**
@@ -101,147 +100,6 @@
}
@Override
- public StompFrame onSend(StompFrame frame)
- {
- StompFrame response = null;
- try
- {
- connection.validate();
- String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
-
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = connection.createServerMessage();
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = frame.getBody();
- message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- connection.sendServerMessage(message, txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- catch (Exception e)
- {
- response = new HornetQStompException("Error handling send", e).getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onBegin(StompFrame frame)
- {
- StompFrame response = null;
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("Need a transaction id to begin").getFrame();
- }
- else
- {
- try
- {
- connection.beginTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- }
- return response;
- }
-
- @Override
- public StompFrame onCommit(StompFrame request)
- {
- StompFrame response = null;
-
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to COMMIT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.commitTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- return response;
- }
-
- @Override
- public StompFrame onAbort(StompFrame request)
- {
- StompFrame response = null;
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
-
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to ABORT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.abortTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onSubscribe(StompFrame request)
- {
- StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
-
- String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
- String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
- String id = request.getHeader(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
-
- if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
- }
-
- try
- {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
public StompFrame onUnsubscribe(StompFrame request)
{
StompFrame response = null;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22 02:33:35 UTC (rev 11736)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22 04:12:00 UTC (rev 11737)
@@ -17,7 +17,6 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
@@ -33,7 +32,6 @@
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
/**
@@ -193,147 +191,6 @@
}
@Override
- public StompFrame onSend(StompFrame frame)
- {
- StompFrame response = null;
- try
- {
- connection.validate();
- String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
-
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = connection.createServerMessage();
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = frame.getBody();
- message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- connection.sendServerMessage(message, txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- catch (Exception e)
- {
- response = new HornetQStompException("Error handling send", e).getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onBegin(StompFrame frame)
- {
- StompFrame response = null;
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("Need a transaction id to begin").getFrame();
- }
- else
- {
- try
- {
- connection.beginTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- }
- return response;
- }
-
- @Override
- public StompFrame onCommit(StompFrame request)
- {
- StompFrame response = null;
-
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to COMMIT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.commitTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- return response;
- }
-
- @Override
- public StompFrame onAbort(StompFrame request)
- {
- StompFrame response = null;
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
-
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to ABORT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.abortTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onSubscribe(StompFrame request)
- {
- StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
-
- String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
- String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
- String id = request.getHeader(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
-
- if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
- }
-
- try
- {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
public StompFrame onUnsubscribe(StompFrame request)
{
StompFrame response = null;
13 years, 1 month
JBoss hornetq SVN: r11736 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 21:33:35 -0500 (Mon, 21 Nov 2011)
New Revision: 11736
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
JBPAPP-6030 - fixing transferring lock on receiveImmediate()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-22 01:50:41 UTC (rev 11735)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-22 02:33:35 UTC (rev 11736)
@@ -399,12 +399,29 @@
// We execute this on the same executor to make sure the force delivery message is written after
// any delivery is completed
- ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
-
- forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
-
- callback.sendMessage(forcedDeliveryMessage, id, 0);
+ synchronized (lock)
+ {
+ if (transferring)
+ {
+ // Case it's transferring (reattach), we will retry later
+ messageQueue.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ forceDelivery(sequence);
+ }
+ });
+ }
+ else
+ {
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ }
}
catch (Exception e)
{
13 years, 1 month
JBoss hornetq SVN: r11735 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 20:50:41 -0500 (Mon, 21 Nov 2011)
New Revision: 11735
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
Removed:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
Moving InVMNodeManager to test packages as it was supposed to be done (no semantic changes on this commit)
Deleted: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -1,144 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.server.NodeManager;
-import org.hornetq.utils.UUIDGenerator;
-
-import java.util.concurrent.Semaphore;
-
-import static org.hornetq.core.server.impl.InVMNodeManager.State.*;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
- * Date: Oct 13, 2010
- * Time: 3:55:47 PM
- */
-public class InVMNodeManager extends NodeManager
-{
-
- private Semaphore liveLock;
-
- private Semaphore backupLock;
-
- public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
-
- public State state = NOT_STARTED;
-
- public InVMNodeManager()
- {
- liveLock = new Semaphore(1);
- backupLock = new Semaphore(1);
- uuid = UUIDGenerator.getInstance().generateUUID();
- nodeID = new SimpleString(uuid.toString());
- }
-
- @Override
- public void awaitLiveNode() throws Exception
- {
- do
- {
- while (state == NOT_STARTED)
- {
- Thread.sleep(2000);
- }
-
- liveLock.acquire();
-
- if (state == PAUSED)
- {
- liveLock.release();
- Thread.sleep(2000);
- }
- else if (state == FAILING_BACK)
- {
- liveLock.release();
- Thread.sleep(2000);
- }
- else if (state == LIVE)
- {
- break;
- }
- }
- while (true);
- }
-
- @Override
- public void startBackup() throws Exception
- {
- backupLock.acquire();
- }
-
- @Override
- public void startLiveNode() throws Exception
- {
- state = FAILING_BACK;
- liveLock.acquire();
- state = LIVE;
- }
-
- @Override
- public void pauseLiveServer() throws Exception
- {
- state = PAUSED;
- liveLock.release();
- }
-
- @Override
- public void crashLiveServer() throws Exception
- {
- //overkill as already set to live
- state = LIVE;
- liveLock.release();
- }
-
- @Override
- public void stopBackup() throws Exception
- {
- backupLock.release();
- }
-
- @Override
- public void releaseBackup()
- {
- releaseBackupNode();
- }
-
- @Override
- public boolean isAwaitingFailback() throws Exception
- {
- return state == FAILING_BACK;
- }
-
- @Override
- public boolean isBackupLive() throws Exception
- {
- return liveLock.availablePermits() == 0;
- }
-
- @Override
- public void interrupt()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- private void releaseBackupNode()
- {
- if(backupLock != null)
- {
- backupLock.release();
- }
- }
-}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -14,7 +14,7 @@
package org.hornetq.tests.integration.cluster;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.util.ServiceTestBase;
import java.util.ArrayList;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -41,8 +41,8 @@
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
/**
* A BridgeReconnectTest
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -57,7 +57,7 @@
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -34,8 +34,8 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -21,8 +21,8 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import java.util.ArrayList;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -41,7 +41,7 @@
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -26,7 +26,7 @@
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -24,8 +24,8 @@
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.security.Role;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.TestableServer;
/**
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -27,7 +27,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
Copied: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java (from rev 11733, branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java)
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.util;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.util.concurrent.Semaphore;
+
+import static org.hornetq.tests.integration.cluster.util.InVMNodeManager.State.*;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 13, 2010
+ * Time: 3:55:47 PM
+ */
+public class InVMNodeManager extends NodeManager
+{
+
+ private Semaphore liveLock;
+
+ private Semaphore backupLock;
+
+ public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
+
+ public State state = NOT_STARTED;
+
+ public InVMNodeManager()
+ {
+ liveLock = new Semaphore(1);
+ backupLock = new Semaphore(1);
+ uuid = UUIDGenerator.getInstance().generateUUID();
+ nodeID = new SimpleString(uuid.toString());
+ }
+
+ @Override
+ public void awaitLiveNode() throws Exception
+ {
+ do
+ {
+ while (state == NOT_STARTED)
+ {
+ Thread.sleep(2000);
+ }
+
+ liveLock.acquire();
+
+ if (state == PAUSED)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == FAILING_BACK)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == LIVE)
+ {
+ break;
+ }
+ }
+ while (true);
+ }
+
+ @Override
+ public void startBackup() throws Exception
+ {
+ backupLock.acquire();
+ }
+
+ @Override
+ public void startLiveNode() throws Exception
+ {
+ state = FAILING_BACK;
+ liveLock.acquire();
+ state = LIVE;
+ }
+
+ @Override
+ public void pauseLiveServer() throws Exception
+ {
+ state = PAUSED;
+ liveLock.release();
+ }
+
+ @Override
+ public void crashLiveServer() throws Exception
+ {
+ //overkill as already set to live
+ state = LIVE;
+ liveLock.release();
+ }
+
+ @Override
+ public void stopBackup() throws Exception
+ {
+ backupLock.release();
+ }
+
+ @Override
+ public void releaseBackup()
+ {
+ releaseBackupNode();
+ }
+
+ @Override
+ public boolean isAwaitingFailback() throws Exception
+ {
+ return state == FAILING_BACK;
+ }
+
+ @Override
+ public boolean isBackupLive() throws Exception
+ {
+ return liveLock.availablePermits() == 0;
+ }
+
+ @Override
+ public void interrupt()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ private void releaseBackupNode()
+ {
+ if(backupLock != null)
+ {
+ backupLock.release();
+ }
+ }
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-11-21 23:39:18 UTC (rev 11734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2011-11-22 01:50:41 UTC (rev 11735)
@@ -47,7 +47,6 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
@@ -55,6 +54,7 @@
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.jms.server.management.JMSUtil;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
13 years, 1 month
JBoss hornetq SVN: r11734 - in branches/Branch_2_2_AS7/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 18:39:18 -0500 (Mon, 21 Nov 2011)
New Revision: 11734
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-6030 - Avoiding distributed deadlock on receiveImmediate()
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-21 23:09:13 UTC (rev 11733)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-21 23:39:18 UTC (rev 11734)
@@ -398,9 +398,15 @@
{
checkClosed();
- SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
-
- channel.send(request);
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+ channel.send(request);
+ }
+ });
}
public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-21 23:09:13 UTC (rev 11733)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-21 23:39:18 UTC (rev 11734)
@@ -389,33 +389,30 @@
{
promptDelivery();
- Future future = new Future();
-
- messageQueue.getExecutor().execute(future);
-
- boolean ok = future.await(10000);
-
- if (!ok)
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ messageQueue.getExecutor().execute(new Runnable()
{
- log.warn("Timed out waiting for executor");
- }
+ public void run()
+ {
+ try
+ {
+ // We execute this on the same executor to make sure the force delivery message is written after
+ // any delivery is completed
- try
- {
- // We execute this on the same executor to make sure the force delivery message is written after
- // any delivery is completed
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
- ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
- forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ catch (Exception e)
+ {
+ ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+ }
+ }
+ });
- callback.sendMessage(forcedDeliveryMessage, id, 0);
- }
- catch (Exception e)
- {
- ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
- }
}
public LinkedList<MessageReference> cancelRefs(final boolean failed,
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-21 23:09:13 UTC (rev 11733)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-21 23:39:18 UTC (rev 11734)
@@ -569,7 +569,11 @@
{
ServerConsumer consumer = consumers.get(consumerID);
- consumer.forceDelivery(sequence);
+ // this would be possible if the server consumer was closed by pings/pongs.. etc
+ if (consumer != null)
+ {
+ consumer.forceDelivery(sequence);
+ }
}
public void acknowledge(final long consumerID, final long messageID) throws Exception
13 years, 1 month