JBoss hornetq SVN: r9731 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-29 03:18:30 -0400 (Wed, 29 Sep 2010)
New Revision: 9731
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
force connect loop to stop before closing sessions to avoid deadlock
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-29 07:17:51 UTC (rev 9730)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-29 07:18:30 UTC (rev 9731)
@@ -399,8 +399,11 @@
return;
}
+
synchronized (createSessionLock)
{
+ //we need to stopthe factory from connecting if it is in the middle aof trying to failover before we get the lock
+ causeExit();
synchronized (failoverLock)
{
// work on a copied set. the session will be removed from sessions when session.close() is called
@@ -420,8 +423,6 @@
}
}
- causeExit();
-
closed = true;
}
14 years, 2 months
JBoss hornetq SVN: r9730 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-29 03:17:51 -0400 (Wed, 29 Sep 2010)
New Revision: 9730
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
added list of allowed connectors when allowdirectconnections is used
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-29 07:16:46 UTC (rev 9729)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-29 07:17:51 UTC (rev 9730)
@@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.Pair;
@@ -98,6 +99,8 @@
private final TransportConfiguration connector;
private final boolean allowsDirectConnectionsOnly;
+
+ private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connector,
@@ -140,9 +143,18 @@
}
// a cluster connection will connect to other nodes only if they are directly connected
- // through a static list of connectors
- allowsDirectConnectionsOnly = (serverLocator.getStaticTransportConfigurations() != null);
- } else
+ // through a static list of connectors or broadcasting using UDP.
+ TransportConfiguration[] transportConfigurations = serverLocator.getStaticTransportConfigurations();
+ allowsDirectConnectionsOnly = (transportConfigurations != null);
+ if(allowsDirectConnectionsOnly)
+ {
+ for (TransportConfiguration transportConfiguration : transportConfigurations)
+ {
+ allowableConnections.add(transportConfiguration);
+ }
+ }
+ }
+ else
{
allowsDirectConnectionsOnly = false;
}
@@ -351,7 +363,7 @@
server.getClusterManager().notifyNodeUp(nodeID, sourceNodeID, connectorPair, last, distance);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowsDirectConnectionsOnly && distance > 1)
+ if (allowsDirectConnectionsOnly && distance > 1 && !allowableConnections.contains(connectorPair.a))
{
return;
}
14 years, 2 months
JBoss hornetq SVN: r9729 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-29 03:16:46 -0400 (Wed, 29 Sep 2010)
New Revision: 9729
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
Log:
fixed test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-09-28 16:27:31 UTC (rev 9728)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-09-29 07:16:46 UTC (rev 9729)
@@ -115,14 +115,14 @@
protected void setupServers() throws Exception
{
// The backups
- setupServer(0, isFileStorage(), isNetty(), true);
- setupServer(1, isFileStorage(), isNetty(), true);
- setupServer(2, isFileStorage(), isNetty(), true);
+ setupServer(0, isFileStorage(), isNetty(), true, -1, true);
+ setupServer(1, isFileStorage(), isNetty(), true, -1, true);
+ setupServer(2, isFileStorage(), isNetty(), true, -1, true);
// The lives
- setupServer(3, isFileStorage(), isNetty(), 0);
- setupServer(4, isFileStorage(), isNetty(), 1);
- setupServer(5, isFileStorage(), isNetty(), 2);
+ setupServer(3, isFileStorage(), isNetty(), 0, true);
+ setupServer(4, isFileStorage(), isNetty(), 1, true);
+ setupServer(5, isFileStorage(), isNetty(), 2, true);
}
14 years, 2 months
JBoss hornetq SVN: r9728 - trunk/src/config/jboss-as-4/clustered.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-28 12:27:31 -0400 (Tue, 28 Sep 2010)
New Revision: 9728
Added:
trunk/src/config/jboss-as-4/clustered/login-config.xml
Modified:
trunk/src/config/jboss-as-4/clustered/jms-ds.xml
Log:
https://jira.jboss.org/browse/HORNETQ-460
Modified: trunk/src/config/jboss-as-4/clustered/jms-ds.xml
===================================================================
--- trunk/src/config/jboss-as-4/clustered/jms-ds.xml 2010-09-28 14:04:04 UTC (rev 9727)
+++ trunk/src/config/jboss-as-4/clustered/jms-ds.xml 2010-09-28 16:27:31 UTC (rev 9728)
@@ -22,10 +22,6 @@
<attribute name="FactoryRef">java:/XAConnectionFactory</attribute>
<attribute name="QueueFactoryRef">java:/XAConnectionFactory</attribute>
<attribute name="TopicFactoryRef">java:/XAConnectionFactory</attribute>
- <attribute name="Properties">
- java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
- java.naming.factory.url.pkgs=org.jnp.interfaces
- java.naming.provider.url=localhost:1199
</attribute>
</mbean>
Added: trunk/src/config/jboss-as-4/clustered/login-config.xml
===================================================================
--- trunk/src/config/jboss-as-4/clustered/login-config.xml (rev 0)
+++ trunk/src/config/jboss-as-4/clustered/login-config.xml 2010-09-28 16:27:31 UTC (rev 9728)
@@ -0,0 +1,159 @@
+<?xml version='1.0'?>
+
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<!-- The XML based JAAS login configuration read by the
+org.jboss.security.auth.login.XMLLoginConfig mbean. Add
+an application-policy element for each security domain.
+
+The outline of the application-policy is:
+<application-policy name="security-domain-name">
+ <authentication>
+ <login-module code="login.module1.class.name" flag="control_flag">
+ <module-option name = "option1-name">option1-value</module-option>
+ <module-option name = "option2-name">option2-value</module-option>
+ ...
+ </login-module>
+
+ <login-module code="login.module2.class.name" flag="control_flag">
+ ...
+ </login-module>
+ ...
+ </authentication>
+</application-policy>
+
+$Id: login-config.xml 76444 2008-07-29 23:50:53Z sguilhen(a)redhat.com $
+$Revision: 76444 $
+-->
+
+<policy>
+ <!-- Used by clients within the application server VM such as
+ mbeans and servlets that access EJBs.
+ -->
+ <application-policy name="client-login">
+ <authentication>
+ <login-module code="org.jboss.security.ClientLoginModule"
+ flag="required">
+ <!-- Any existing security context will be restored on logout -->
+ <module-option name="restore-login-identity">true</module-option>
+ </login-module>
+ </authentication>
+ </application-policy>
+
+ <!-- Security domains for testing new jca framework -->
+ <application-policy name="HsqlDbRealm">
+ <authentication>
+ <login-module code="org.jboss.resource.security.ConfiguredIdentityLoginModule"
+ flag="required">
+ <module-option name="principal">sa</module-option>
+ <module-option name="userName">sa</module-option>
+ <module-option name="password"></module-option>
+ <module-option name="managedConnectionFactoryName">jboss.jca:service=LocalTxCM,name=DefaultDS</module-option>
+ </login-module>
+ </authentication>
+ </application-policy>
+
+ <application-policy name="JmsXARealm">
+ <authentication>
+ <login-module code="org.jboss.resource.security.ConfiguredIdentityLoginModule"
+ flag="required">
+ <module-option name="principal">guest</module-option>
+ <module-option name="userName">guest</module-option>
+ <module-option name="password">guest</module-option>
+ <module-option name="managedConnectionFactoryName">jboss.jca:service=TxCM,name=JmsXA</module-option>
+ </login-module>
+ </authentication>
+ </application-policy>
+
+ <!-- A template configuration for hornetq. This
+ defaults to the UsersRolesLoginModule the same as other and should be
+ changed to a stronger authentication mechanism as required.
+ -->
+<application-policy name="hornetq">
+ <authentication>
+ <login-module code="org.jboss.security.auth.spi.UsersRolesLoginModule"
+ flag="required">
+ <module-option name = "unauthenticatedIdentity">guest</module-option>
+ <module-option name="usersProperties">props/hornetq-users.properties</module-option>
+ <module-option name="rolesProperties">props/hornetq-roles.properties</module-option>
+ </login-module>
+ </authentication>
+ </application-policy>
+
+ <!-- A template configuration for the jmx-console web application. This
+ defaults to the UsersRolesLoginModule the same as other and should be
+ changed to a stronger authentication mechanism as required.
+ -->
+ <application-policy name="jmx-console">
+ <authentication>
+ <login-module code="org.jboss.security.auth.spi.UsersRolesLoginModule"
+ flag="required">
+ <module-option name="usersProperties">props/jmx-console-users.properties</module-option>
+ <module-option name="rolesProperties">props/jmx-console-roles.properties</module-option>
+ </login-module>
+ </authentication>
+ </application-policy>
+
+ <!-- A template configuration for the web-console web application. This
+ defaults to the UsersRolesLoginModule the same as other and should be
+ changed to a stronger authentication mechanism as required.
+ -->
+ <application-policy name="web-console">
+ <authentication>
+ <login-module code="org.jboss.security.auth.spi.UsersRolesLoginModule"
+ flag="required">
+ <module-option name="usersProperties">web-console-users.properties</module-option>
+ <module-option name="rolesProperties">web-console-roles.properties</module-option>
+ </login-module>
+ </authentication>
+ </application-policy>
+
+ <!--
+ A template configuration for the JBossWS security domain.
+ This defaults to the UsersRolesLoginModule the same as other and should be
+ changed to a stronger authentication mechanism as required.
+ -->
+ <application-policy name="JBossWS">
+ <authentication>
+ <login-module code="org.jboss.security.auth.spi.UsersRolesLoginModule"
+ flag="required">
+ <module-option name="usersProperties">props/jbossws-users.properties</module-option>
+ <module-option name="rolesProperties">props/jbossws-roles.properties</module-option>
+ <module-option name="unauthenticatedIdentity">anonymous</module-option>
+ </login-module>
+ </authentication>
+ </application-policy>
+
+ <!-- The default login configuration used by any security domain that
+ does not have a application-policy entry with a matching name
+ -->
+ <application-policy name="other">
+ <!-- A simple server login module, which can be used when the number
+ of users is relatively small. It uses two properties files:
+ users.properties, which holds users (key) and their password (value).
+ roles.properties, which holds users (key) and a comma-separated list of
+ their roles (value).
+ The unauthenticatedIdentity property defines the name of the principal
+ that will be used when a null username and password are presented as is
+ the case for an unuathenticated web client or MDB. If you want to
+ allow such users to be authenticated add the property, e.g.,
+ unauthenticatedIdentity="nobody"
+ -->
+ <authentication>
+ <login-module code="org.jboss.security.auth.spi.UsersRolesLoginModule"
+ flag="required"/>
+ </authentication>
+ </application-policy>
+
+</policy>
14 years, 2 months
JBoss hornetq SVN: r9727 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-28 10:04:04 -0400 (Tue, 28 Sep 2010)
New Revision: 9727
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
Log:
HA fixes for multiple backups
* set *initial* reconnect attempts to -1 on the server locator that the backup uses to connect to the live server
* update backup topology with notifications received from the connection to the live server
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-28 09:59:05 UTC (rev 9726)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-28 14:04:04 UTC (rev 9727)
@@ -1211,6 +1211,11 @@
}
}
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.add(listener);
@@ -1320,7 +1325,7 @@
factory = getFactory();
try
{
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-28 09:59:05 UTC (rev 9726)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-28 14:04:04 UTC (rev 9727)
@@ -57,4 +57,6 @@
void setBackup(boolean backup);
void setInitialConnectAttempts(int reconnectAttempts);
+
+ Topology getTopology();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-28 09:59:05 UTC (rev 9726)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-28 14:04:04 UTC (rev 9727)
@@ -50,7 +50,6 @@
}
else
{
- System.out.println("current=" + currentMember + ", new=" + member);
if(hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
{
currentMember.getConnector().a = member.getConnector().a;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-28 09:59:05 UTC (rev 9726)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-28 14:04:04 UTC (rev 9727)
@@ -133,7 +133,7 @@
this.serverLocator.setClusterConnection(true);
this.serverLocator.setClusterTransportConfiguration(connector);
this.serverLocator.setBackup(server.getConfiguration().isBackup());
- this.serverLocator.setReconnectAttempts(-1);
+ this.serverLocator.setInitialConnectAttempts(-1);
if(retryInterval > 0)
{
this.serverLocator.setRetryInterval(retryInterval);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-28 09:59:05 UTC (rev 9726)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-28 14:04:04 UTC (rev 9727)
@@ -209,14 +209,15 @@
{
public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
- //todo update the topology
+ notifyNodeUp(nodeID, sourceNodeID, connectorPair, last, distance);
}
public void nodeDown(String nodeID)
{
- //todo update the topology
+ notifyNodeDown(nodeID);
}
});
+ locator.setNodeID(nodeUUID.toString());
backupSessionFactory = locator.connect();
backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-28 09:59:05 UTC (rev 9726)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-28 14:04:04 UTC (rev 9727)
@@ -33,6 +33,7 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -158,6 +159,10 @@
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ if (!ok)
+ {
+ System.out.println(((ServerLocatorInternal)locator).getTopology().describe());
+ }
assertTrue(ok);
return sf;
}
14 years, 2 months
JBoss hornetq SVN: r9726 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-09-28 05:59:05 -0400 (Tue, 28 Sep 2010)
New Revision: 9726
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
added check for wrapped exception
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-28 09:33:31 UTC (rev 9725)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-28 09:59:05 UTC (rev 9726)
@@ -611,11 +611,14 @@
}
catch (InterruptedException e)
{
- System.out.println("HornetQServerImpl$SharedStoreBackupActivation.run");
+ //this is ok, we are being stopped
}
catch (Exception e)
{
- log.error("Failure in initialisation", e);
+ if(!(e.getCause() instanceof InterruptedException))
+ {
+ log.error("Failure in initialisation", e);
+ }
}
}
14 years, 2 months
JBoss hornetq SVN: r9725 - in trunk: tests/src/org/hornetq/tests/integration/jms/connection and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-28 05:33:31 -0400 (Tue, 28 Sep 2010)
New Revision: 9725
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
Log:
https://jira.jboss.org/browse/HORNETQ-525
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-09-27 12:27:02 UTC (rev 9724)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-09-28 09:33:31 UTC (rev 9725)
@@ -216,7 +216,7 @@
justCreated = false;
}
- public void start() throws JMSException
+ public synchronized void start() throws JMSException
{
checkClosed();
@@ -229,7 +229,7 @@
started = true;
}
- public void stop() throws JMSException
+ public synchronized void stop() throws JMSException
{
checkClosed();
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-09-27 12:27:02 UTC (rev 9724)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-09-28 09:33:31 UTC (rev 9725)
@@ -111,11 +111,11 @@
// Constructors --------------------------------------------------
protected HornetQSession(final HornetQConnection connection,
- final boolean transacted,
- final boolean xa,
- final int ackMode,
- final ClientSession session,
- final int sessionType)
+ final boolean transacted,
+ final boolean xa,
+ final int ackMode,
+ final ClientSession session,
+ final int sessionType)
{
this.connection = connection;
@@ -213,7 +213,7 @@
return ackMode;
}
-
+
public boolean isXA()
{
return xa;
@@ -262,21 +262,24 @@
public void close() throws JMSException
{
- try
+ synchronized (connection)
{
- for (HornetQMessageConsumer cons : new HashSet<HornetQMessageConsumer>(consumers))
+ try
{
- cons.close();
- }
+ for (HornetQMessageConsumer cons : new HashSet<HornetQMessageConsumer>(consumers))
+ {
+ cons.close();
+ }
- session.close();
+ session.close();
- connection.removeSession(this);
+ connection.removeSession(this);
+ }
+ catch (HornetQException e)
+ {
+ throw JMSExceptionHelper.convertFromHornetQException(e);
+ }
}
- catch (HornetQException e)
- {
- throw JMSExceptionHelper.convertFromHornetQException(e);
- }
}
public void recover() throws JMSException
@@ -393,7 +396,7 @@
try
{
HornetQQueue queue = lookupQueue(queueName, false);
-
+
if (queue == null)
{
queue = lookupQueue(queueName, true);
@@ -413,7 +416,6 @@
throw JMSExceptionHelper.convertFromHornetQException(e);
}
}
-
public Topic createTopic(final String topicName) throws JMSException
{
@@ -423,7 +425,6 @@
throw new IllegalStateException("Cannot create a topic on a QueueSession");
}
-
try
{
HornetQTopic topic = lookupTopic(topicName, false);
@@ -477,7 +478,7 @@
}
HornetQDestination jbdest = (HornetQDestination)topic;
-
+
if (jbdest.isQueue())
{
throw new InvalidDestinationException("Cannot create a subscriber on a queue");
@@ -490,7 +491,7 @@
final String subscriptionName,
String selectorString,
final boolean noLocal) throws JMSException
- {
+ {
try
{
selectorString = "".equals(selectorString) ? null : selectorString;
@@ -525,7 +526,7 @@
SimpleString autoDeleteQueueName = null;
if (dest.isQueue())
- {
+ {
BindingQuery response = session.bindingQuery(dest.getSimpleAddress());
if (!response.isExists())
@@ -573,7 +574,7 @@
}
queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
- subscriptionName));
+ subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName);
@@ -678,10 +679,10 @@
}
HornetQDestination jbq = (HornetQDestination)queue;
-
+
if (!jbq.isQueue())
{
- throw new InvalidDestinationException("Cannot create a browser on a topic");
+ throw new InvalidDestinationException("Cannot create a browser on a topic");
}
try
@@ -767,7 +768,7 @@
}
SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(connection.getClientID(),
- name));
+ name));
try
{
@@ -887,7 +888,7 @@
{
throw new InvalidDestinationException("Not a temporary topic " + tempTopic);
}
-
+
try
{
BindingQuery response = session.bindingQuery(tempTopic.getSimpleAddress());
@@ -949,7 +950,7 @@
throw JMSExceptionHelper.convertFromHornetQException(e);
}
}
-
+
public void start() throws JMSException
{
try
@@ -991,11 +992,11 @@
}
catch (HornetQException ignore)
{
- //Exception on deleting queue shouldn't prevent close from completing
+ // Exception on deleting queue shouldn't prevent close from completing
}
}
}
-
+
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
@@ -1007,11 +1008,11 @@
throw new IllegalStateException("Session is closed");
}
}
-
+
private HornetQQueue lookupQueue(final String queueName, boolean isTemporary) throws HornetQException
{
HornetQQueue queue;
-
+
if (isTemporary)
{
queue = HornetQDestination.createTemporaryQueue(queueName);
@@ -1020,7 +1021,7 @@
{
queue = HornetQDestination.createQueue(queueName);
}
-
+
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
if (response.isExists())
@@ -1032,12 +1033,12 @@
return null;
}
}
-
+
private HornetQTopic lookupTopic(final String topicName, final boolean isTemporary) throws HornetQException
{
HornetQTopic topic;
-
+
if (isTemporary)
{
topic = HornetQDestination.createTemporaryTopic(topicName);
@@ -1046,7 +1047,7 @@
{
topic = HornetQDestination.createTopic(topicName);
}
-
+
BindingQuery query = session.bindingQuery(topic.getSimpleAddress());
if (!query.isExists())
@@ -1059,7 +1060,6 @@
}
}
-
// Inner classes -------------------------------------------------
}
Added: trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/ConcurrentSessionCloseTest.java 2010-09-28 09:33:31 UTC (rev 9725)
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A ConcurrentSessionCloseTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ConcurrentSessionCloseTest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(ConcurrentSessionCloseTest.class);
+
+ private HornetQConnectionFactory cf;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ cf = null;
+
+ super.tearDown();
+ }
+
+ // https://jira.jboss.org/browse/HORNETQ-525
+ public void testConcurrentClose() throws Exception
+ {
+ final Connection con = cf.createConnection();
+
+ for (int j = 0; j < 100; j++)
+ {
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ int threadCount = 10;
+
+ ThreadGroup group = new ThreadGroup("Test");
+
+ Thread[] threads = new Thread[threadCount];
+
+ for (int i = 0; i < threadCount; i++)
+ {
+ threads[i] = new Thread(group, "thread " + i)
+ {
+ public void run()
+ {
+ try
+ {
+ con.start();
+
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ session.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+
+ failed.set(true);
+ }
+
+ };
+ };
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threadCount; i++)
+ {
+ threads[i].join();
+ }
+
+ assertFalse(failed.get());
+ }
+
+ jmsServer.stop();
+ }
+
+}
14 years, 2 months
JBoss hornetq SVN: r9724 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-27 08:27:02 -0400 (Mon, 27 Sep 2010)
New Revision: 9724
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-450
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-27 09:37:14 UTC (rev 9723)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-27 12:27:02 UTC (rev 9724)
@@ -16,7 +16,6 @@
import java.io.File;
import java.util.Iterator;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
@@ -728,6 +727,7 @@
{
ClientConsumerImpl.log.trace("Adding Runner on Executor for delivery");
}
+
sessionExecutor.execute(runner);
}
@@ -805,6 +805,12 @@
if (message != null)
{
+ if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+ {
+ //Ignore, this could be a relic from a previous receiveImmediate();
+ return;
+ }
+
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);
@@ -932,7 +938,6 @@
{
public void run()
{
-
try
{
callOnMessage();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-27 09:37:14 UTC (rev 9723)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-27 12:27:02 UTC (rev 9724)
@@ -12,6 +12,8 @@
*/
package org.hornetq.tests.integration.client;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
@@ -22,6 +24,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
@@ -153,7 +156,52 @@
sf.close();
}
+
+ // https://jira.jboss.org/browse/HORNETQ-450
+ public void testReceivedImmediateFollowedByAsyncConsume() throws Exception
+ {
+ sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonDurableSend(true);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage message = session.createMessage(false);
+
+ producer.send(message);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+ session.start();
+
+ ClientMessage received = consumer.receiveImmediate();
+
+ assertNotNull(received);
+
+ received.acknowledge();
+
+ final AtomicBoolean receivedAsync = new AtomicBoolean(false);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ receivedAsync.set(true);
+ }
+ });
+
+ Thread.sleep(1000);
+
+ assertFalse(receivedAsync.get());
+
+ session.close();
+
+ sf.close();
+ }
+
private void doConsumerReceiveImmediateWithNoMessages(final boolean browser) throws Exception
{
sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
14 years, 2 months
JBoss hornetq SVN: r9723 - in trunk: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-27 05:37:14 -0400 (Mon, 27 Sep 2010)
New Revision: 9723
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-450
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-24 12:16:25 UTC (rev 9722)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-09-27 09:37:14 UTC (rev 9723)
@@ -113,7 +113,7 @@
private boolean stopped = false;
- private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+ private long forceDeliveryCount;
private final SessionQueueQueryResponseMessage queueInfo;
@@ -226,7 +226,7 @@
// we only force delivery once per call to receive
if (!deliveryForced)
{
- session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
+ session.forceDelivery(id, forceDeliveryCount++);
deliveryForced = true;
}
@@ -260,18 +260,17 @@
if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
{
long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
- if (seq >= forceDeliveryCount.longValue())
+
+ if (forcingDelivery && seq == forceDeliveryCount - 1)
{
// forced delivery messages are discarded, nothing has been delivered by the queue
- if (forcingDelivery)
- {
- resetIfSlowConsumer();
- return null;
- }
+ resetIfSlowConsumer();
+
+ return null;
}
else
{
- // ignore any previous forced delivery message
+ // Ignore the message
continue;
}
}
@@ -425,7 +424,7 @@
lastAckedMessage = null;
creditsToSend = 0;
-
+
ackIndividually = false;
}
@@ -468,7 +467,7 @@
{
return browseOnly;
}
-
+
public synchronized void handleMessage(final ClientMessageInternal message) throws Exception
{
if (closing)
@@ -571,7 +570,7 @@
while (iter.hasNext())
{
ClientMessageInternal message = iter.next();
-
+
flowControlBeforeConsumption(message);
}
@@ -603,7 +602,7 @@
{
flushAcks();
}
-
+
session.individualAcknowledge(id, message.getMessageID());
}
else
@@ -708,7 +707,7 @@
private void resetIfSlowConsumer()
{
- if(clientWindowSize == 0)
+ if (clientWindowSize == 0)
{
slowConsumerInitialCreditSent = false;
sendCredits(0);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-24 12:16:25 UTC (rev 9722)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-27 09:37:14 UTC (rev 9723)
@@ -73,7 +73,7 @@
}
/*
- * Construct a MessageImpl from storage, or notification
+ * Construct a MessageImpl from storage, or notification, or before routing
*/
public ServerMessageImpl(final long messageID, final int initialMessageBufferSize)
{
@@ -82,11 +82,6 @@
this.messageID = messageID;
}
- protected ServerMessageImpl(final int initialMessageBufferSize)
- {
- super(initialMessageBufferSize);
- }
-
/*
* Copy constructor
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-24 12:16:25 UTC (rev 9722)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2010-09-27 09:37:14 UTC (rev 9723)
@@ -16,7 +16,12 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
@@ -113,7 +118,42 @@
sf.close();
}
+
+ // https://jira.jboss.org/browse/HORNETQ-450
+ public void testReceivedImmediateFollowedByReceive() throws Exception
+ {
+ sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonDurableSend(true);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage message = session.createMessage(false);
+
+ producer.send(message);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+ session.start();
+
+ ClientMessage received = consumer.receiveImmediate();
+
+ assertNotNull(received);
+
+ received.acknowledge();
+
+ received = consumer.receive(1);
+
+ assertNull(received);
+
+ session.close();
+
+ sf.close();
+ }
+
private void doConsumerReceiveImmediateWithNoMessages(final boolean browser) throws Exception
{
sf = HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
14 years, 3 months
JBoss hornetq SVN: r9722 - in trunk: tests/jms-tests/src/org/hornetq/jms/tests/selector and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-24 08:16:25 -0400 (Fri, 24 Sep 2010)
New Revision: 9722
Modified:
trunk/src/main/org/hornetq/jms/client/SelectorTranslator.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/selector/SelectorTest.java
trunk/tests/src/org/hornetq/tests/unit/jms/client/SelectorTranslatorTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-528
Modified: trunk/src/main/org/hornetq/jms/client/SelectorTranslator.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/SelectorTranslator.java 2010-09-24 07:35:56 UTC (rev 9721)
+++ trunk/src/main/org/hornetq/jms/client/SelectorTranslator.java 2010-09-24 12:16:25 UTC (rev 9722)
@@ -50,6 +50,7 @@
filterString = SelectorTranslator.parse(filterString, "JMSPriority", "HQPriority");
filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "HQTimestamp");
filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "HQUserID");
+ filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "HQExpiration");
return filterString;
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/selector/SelectorTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/selector/SelectorTest.java 2010-09-24 07:35:56 UTC (rev 9721)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/selector/SelectorTest.java 2010-09-24 12:16:25 UTC (rev 9722)
@@ -846,6 +846,54 @@
}
}
+ public void testJMSExpirationOnSelector() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = getConnectionFactory().createConnection();
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = session.createProducer(HornetQServerTestCase.queue1);
+
+ TextMessage msg1 = session.createTextMessage("msg1");
+ prod.send(msg1);
+
+ prod.setTimeToLive(100000);
+
+ TextMessage msg2 = session.createTextMessage("msg2");
+
+ prod.send(msg2);
+
+ long expire = msg2.getJMSExpiration();
+
+ String selector = "JMSExpiration = " + expire;
+
+ MessageConsumer cons = session.createConsumer(HornetQServerTestCase.queue1, selector);
+
+ conn.start();
+
+ TextMessage rec = (TextMessage)cons.receive(10000);
+
+ assertNotNull(rec);
+
+ assertEquals("msg2", rec.getText());
+
+ assertNull(cons.receiveNoWait());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
public void testJMSTypeOnSelector() throws Exception
{
Connection conn = null;
Modified: trunk/tests/src/org/hornetq/tests/unit/jms/client/SelectorTranslatorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/jms/client/SelectorTranslatorTest.java 2010-09-24 07:35:56 UTC (rev 9721)
+++ trunk/tests/src/org/hornetq/tests/unit/jms/client/SelectorTranslatorTest.java 2010-09-24 12:16:25 UTC (rev 9722)
@@ -141,7 +141,31 @@
SelectorTranslator.convertToHornetQFilterString(selector));
}
+
+ public void testParseJMSExpiration()
+ {
+ String selector = "JMSExpiration=12345678";
+ Assert.assertEquals("HQExpiration=12345678", SelectorTranslator.convertToHornetQFilterString(selector));
+
+ selector = " JMSExpiration=12345678";
+
+ Assert.assertEquals(" HQExpiration=12345678", SelectorTranslator.convertToHornetQFilterString(selector));
+
+ selector = " JMSExpiration=12345678 OR 78766 = JMSExpiration AND (JMSExpiration= 1 + 4878787)";
+
+ Assert.assertEquals(" HQExpiration=12345678 OR 78766 = HQExpiration AND (HQExpiration= 1 + 4878787)",
+ SelectorTranslator.convertToHornetQFilterString(selector));
+
+ checkNoSubstitute("JMSExpiration");
+
+ selector = "animal = 'lion' JMSExpiration = 321 OR animal_name = 'xyzJMSExpirationxyz'";
+
+ Assert.assertEquals("animal = 'lion' HQExpiration = 321 OR animal_name = 'xyzJMSExpirationxyz'",
+ SelectorTranslator.convertToHornetQFilterString(selector));
+
+ }
+
public void testParseJMSCorrelationID()
{
String selector = "JMSCorrelationID='ID:HQ-12435678";
@@ -191,6 +215,8 @@
checkNoSubstitute("JMSType");
}
+
+
// Private -------------------------------------------------------------------------------------
14 years, 3 months