JBoss hornetq SVN: r12075 - trunk/hornetq-core/src/main/java/org/hornetq/core/server/management/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-03 06:51:32 -0500 (Fri, 03 Feb 2012)
New Revision: 12075
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
Log:
https://issues.jboss.org/browse/HORNETQ-821 - added syncronization
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2012-02-03 11:49:25 UTC (rev 12074)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2012-02-03 11:51:32 UTC (rev 12075)
@@ -472,12 +472,12 @@
return reply;
}
- public Object getResource(final String resourceName)
+ public synchronized Object getResource(final String resourceName)
{
return registry.get(resourceName);
}
- public Object[] getResources(final Class<?> resourceType)
+ public synchronized Object[] getResources(final Class<?> resourceType)
{
List<Object> resources = new ArrayList<Object>();
Collection<Object> clone = new ArrayList<Object>(registry.values());
12 years, 11 months
JBoss hornetq SVN: r12074 - in trunk: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 7 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-03 06:49:25 -0500 (Fri, 03 Feb 2012)
New Revision: 12074
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/security/HornetQPrincipal.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/RemotingService.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java
trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connection.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
Log:
https://issues.jboss.org/browse/HORNETQ-841 - added support for invm unsecure connections
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -13,6 +13,7 @@
package org.hornetq.core.protocol.core;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -100,4 +101,11 @@
* Called periodically to flush any data in the batch buffer
*/
void checkFlushBatchBuffer();
+
+ /**
+ * get the default security principal
+ *
+ * @return the principal
+ */
+ HornetQPrincipal getDefaultHornetQPrincipal();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -33,6 +33,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -461,6 +462,11 @@
transportConnection.checkFlushBatchBuffer();
}
+ public HornetQPrincipal getDefaultHornetQPrincipal()
+ {
+ return transportConnection.getDefaultHornetQPrincipal();
+ }
+
// Buffer Handler implementation
// ----------------------------------------------------
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
@@ -64,6 +65,8 @@
private final Map<String, Object> configuration;
+ private HornetQPrincipal defaultHornetQPrincipal;
+
public InVMAcceptor(final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
final BufferHandler handler,
@@ -209,7 +212,7 @@
throw new IllegalStateException("Acceptor is not started");
}
- new InVMConnection(this, id, connectionID, remoteHandler, new Listener(connector), clientExecutor);
+ new InVMConnection(this, id, connectionID, remoteHandler, new Listener(connector), clientExecutor, defaultHornetQPrincipal);
}
public void disconnect(final String connectionID)
@@ -227,6 +230,21 @@
}
}
+ /**
+ * we are InVM so allow unsecure connections
+ *
+ * @return true
+ */
+ public boolean isUnsecurable()
+ {
+ return true;
+ }
+
+ public void setDefaultHornetQPrincipal(HornetQPrincipal defaultHornetQPrincipal)
+ {
+ this.defaultHornetQPrincipal = defaultHornetQPrincipal;
+ }
+
private class Listener implements ConnectionLifeCycleListener
{
//private static Listener instance = new Listener();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -58,6 +59,8 @@
private volatile boolean closing;
+ private HornetQPrincipal defaultHornetQPrincipal;
+
public InVMConnection(final Acceptor acceptor,
final int serverID,
final BufferHandler handler,
@@ -74,6 +77,17 @@
final ConnectionLifeCycleListener listener,
final Executor executor)
{
+ this(acceptor, serverID, id, handler, listener, executor, null);
+ }
+
+ public InVMConnection(final Acceptor acceptor,
+ final int serverID,
+ final String id,
+ final BufferHandler handler,
+ final ConnectionLifeCycleListener listener,
+ final Executor executor,
+ HornetQPrincipal defaultHornetQPrincipal)
+ {
this.serverID = serverID;
this.handler = handler;
@@ -84,6 +98,8 @@
this.executor = executor;
+ this.defaultHornetQPrincipal = defaultHornetQPrincipal;
+
listener.connectionCreated(acceptor, this, ProtocolType.CORE);
}
@@ -213,7 +229,12 @@
public void removeReadyListener(ReadyListener listener)
{
}
-
+
+ public HornetQPrincipal getDefaultHornetQPrincipal()
+ {
+ return defaultHornetQPrincipal;
+ }
+
public void disableFlush()
{
flushEnabled = false;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -37,6 +37,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
@@ -647,6 +648,25 @@
this.notificationService = notificationService;
}
+ /**
+ * we dont allow this
+ *
+ * @param defaultHornetQPrincipal
+ */
+ public void setDefaultHornetQPrincipal(HornetQPrincipal defaultHornetQPrincipal)
+ {
+ throw new IllegalStateException("unsecure connections not allowed");
+ }
+
+ /**
+ * only InVM acceptors should allow this
+ * @return
+ */
+ public boolean isUnsecurable()
+ {
+ return false;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.spi.core.remoting.Acceptor#getClusterConnection()
*/
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
@@ -262,6 +263,12 @@
readyListeners.remove(listener);
}
+ //never allow this
+ public HornetQPrincipal getDefaultHornetQPrincipal()
+ {
+ return null;
+ }
+
void fireReady(final boolean ready)
{
for (ReadyListener listener: readyListeners)
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/RemotingService.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/RemotingService.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -16,6 +16,7 @@
import java.util.Set;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.spi.core.protocol.RemotingConnection;
/**
@@ -49,5 +50,11 @@
void freeze();
+ /**
+ * allow acceptors to use this as their default security Priincipal if applicable
+ * @param principal
+ */
+ void allowInvmSecurityOverride(HornetQPrincipal principal);
+
RemotingConnection getServerSideReplicatingConnection();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -36,8 +36,10 @@
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptor;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
@@ -246,6 +248,17 @@
started = true;
}
+ public synchronized void allowInvmSecurityOverride(HornetQPrincipal principal)
+ {
+ for (Acceptor acceptor : acceptors)
+ {
+ if(acceptor.isUnsecurable())
+ {
+ acceptor.setDefaultHornetQPrincipal(principal);
+ }
+ }
+ }
+
public synchronized void freeze()
{
// Used in testing - prevents service taking any more connections
@@ -662,4 +675,4 @@
});
}
-}
\ No newline at end of file
+}
Added: trunk/hornetq-core/src/main/java/org/hornetq/core/security/HornetQPrincipal.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/security/HornetQPrincipal.java (rev 0)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/security/HornetQPrincipal.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -0,0 +1,49 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.core.security;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 1/30/12
+ */
+public class HornetQPrincipal
+{
+ private final String userName;
+
+ private final String password;
+
+ public HornetQPrincipal(String userName, String password)
+ {
+ this.userName = userName;
+ this.password = password;
+ }
+
+ public String getUserName()
+ {
+ return userName;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -15,6 +15,7 @@
import java.util.Map;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.NotificationService;
@@ -46,4 +47,15 @@
* @param notificationService the notification service
*/
void setNotificationService(NotificationService notificationService);
+
+ /**
+ * Set the default security Principal to be used when no user/pass are defined, only for InVM
+ */
+ void setDefaultHornetQPrincipal(HornetQPrincipal defaultHornetQPrincipal);
+
+ /**
+ * does this acceptor allow unsecure connections,
+ * if false @setDefaultHornetQPrincipal should throw an @java.lang.IllegalStatException
+ */
+ boolean isUnsecurable();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connection.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connection.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.security.HornetQPrincipal;
/**
* The connection used by a channel to write data to.
@@ -74,4 +75,9 @@
void addReadyListener(ReadyListener listener);
void removeReadyListener(ReadyListener listener);
+
+ /**
+ * return teh default Principal if there is one for this connection
+ */
+ HornetQPrincipal getDefaultHornetQPrincipal();
}
\ No newline at end of file
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2012-02-03 11:23:59 UTC (rev 12073)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2012-02-03 11:49:25 UTC (rev 12074)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.remoting.impl.invm.InVMConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
+import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -103,5 +104,10 @@
super.write(buffer, flush, batch);
}
+
+ public HornetQPrincipal getDefaultHornetQPrincipal()
+ {
+ return null;
+ }
}
}
12 years, 11 months
JBoss hornetq SVN: r12073 - in trunk: tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-03 06:23:59 -0500 (Fri, 03 Feb 2012)
New Revision: 12073
Modified:
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/ConnectionFactoryProperties.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-836 - added local bind address to RA
Modified: trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
--- trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/ConnectionFactoryProperties.java 2012-02-03 11:21:52 UTC (rev 12072)
+++ trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/ConnectionFactoryProperties.java 2012-02-03 11:23:59 UTC (rev 12073)
@@ -52,6 +52,8 @@
private Integer discoveryPort;
+ private String discoveryLocalBindAddress;
+
private Long discoveryRefreshTimeout;
private Long discoveryInitialWaitTimeout;
@@ -183,6 +185,25 @@
return discoveryPort;
}
+ public void setDiscoveryLocalBindAddress(final String discoveryLocalBindAddress)
+ {
+ if (ConnectionFactoryProperties.trace)
+ {
+ ConnectionFactoryProperties.log.trace("setDiscoveryLocalBindAddress(" + discoveryLocalBindAddress + ")");
+ }
+ hasBeenUpdated = true;
+ this.discoveryLocalBindAddress = discoveryLocalBindAddress;
+ }
+
+ public String getDiscoveryLocalBindAddress()
+ {
+ if (ConnectionFactoryProperties.trace)
+ {
+ ConnectionFactoryProperties.log.trace("getDiscoveryLocalBindAddress()");
+ }
+ return discoveryLocalBindAddress;
+ }
+
public void setDiscoveryPort(final Integer discoveryPort)
{
if (ConnectionFactoryProperties.trace)
Modified: trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 11:21:52 UTC (rev 12072)
+++ trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 11:23:59 UTC (rev 12073)
@@ -38,7 +38,6 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.recovery.XARecoveryConfig;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.ra.recovery.RecoveryManager;
@@ -355,6 +354,36 @@
}
/**
+ * set the discovery local bind address
+ *
+ * @param discoveryLocalBindAddress the address value
+ */
+ public void setDiscoveryLocalBindAddress(final String discoveryLocalBindAddress)
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("setDiscoveryLocalBindAddress(" + discoveryLocalBindAddress + ")");
+ }
+
+ raProperties.setDiscoveryLocalBindAddress(discoveryLocalBindAddress);
+ }
+
+ /**
+ * get the discovery local bind address
+ *
+ * @return the address value
+ */
+ public String getDiscoveryLocalBindAddress()
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("getDiscoveryLocalBindAddress()");
+ }
+
+ return raProperties.getDiscoveryLocalBindAddress();
+ }
+
+ /**
* Set the discovery group port
*
* @param dgp The value
@@ -1447,7 +1476,7 @@
{
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
: getDiscoveryPort();
-
+
if(discoveryPort == null)
{
discoveryPort = HornetQClient.DEFAULT_DISCOVERY_PORT;
@@ -1479,6 +1508,11 @@
groupConfiguration.setRefreshTimeout(refreshTimeout);
+ String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress()
+ : raProperties.getDiscoveryLocalBindAddress();
+
+ groupConfiguration.setLocalBindAdress(localBindAddress);
+
if (ha)
{
cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
@@ -1587,6 +1621,11 @@
groupConfiguration.setRefreshTimeout(refreshTimeout);
+ String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress()
+ : raProperties.getDiscoveryLocalBindAddress();
+
+ groupConfiguration.setLocalBindAdress(localBindAddress);
+
cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else
Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2012-02-03 11:21:52 UTC (rev 12072)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2012-02-03 11:23:59 UTC (rev 12073)
@@ -101,6 +101,12 @@
" </config-property>\n" +
" <config-property>\n" +
" <description>The discovery group address</description>\n" +
+ " <config-property-name>DiscoveryLocalBindAddress</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The discovery group local bind address</description>\n" +
" <config-property-name>DiscoveryAddress</config-property-name>\n" +
" <config-property-type>java.lang.String</config-property-type>\n" +
" <config-property-value></config-property-value>\n" +
Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2012-02-03 11:21:52 UTC (rev 12072)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2012-02-03 11:23:59 UTC (rev 12073)
@@ -263,9 +263,13 @@
ConnectionFactoryProperties connectionFactoryProperties = new ConnectionFactoryProperties();
connectionFactoryProperties.setDiscoveryAddress("myhost");
connectionFactoryProperties.setDiscoveryPort(5678);
+ connectionFactoryProperties.setDiscoveryLocalBindAddress("newAddress");
HornetQConnectionFactory factory = ra.createHornetQConnectionFactory(connectionFactoryProperties);
HornetQConnectionFactory defaultFactory = ra.getDefaultHornetQConnectionFactory();
Assert.assertNotSame(factory, defaultFactory);
+ Assert.assertEquals(factory.getDiscoveryGroupConfiguration().getLocalBindAddress(), "newAddress");
+ Assert.assertEquals(factory.getDiscoveryGroupConfiguration().getGroupAddress(), "myhost");
+ Assert.assertEquals(factory.getDiscoveryGroupConfiguration().getGroupPort(), 5678);
}
public void testCreateConnectionFactoryMultipleConnectors()
12 years, 11 months
JBoss hornetq SVN: r12072 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/unit/ra and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-03 06:21:52 -0500 (Fri, 03 Feb 2012)
New Revision: 12072
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-8040 - added local bind address to RA
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2012-02-03 11:13:56 UTC (rev 12071)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2012-02-03 11:21:52 UTC (rev 12072)
@@ -52,6 +52,8 @@
private Integer discoveryPort;
+ private String discoveryLocalBindAddress;
+
private Long discoveryRefreshTimeout;
private Long discoveryInitialWaitTimeout;
@@ -183,6 +185,25 @@
return discoveryPort;
}
+ public void setDiscoveryLocalBindAddress(final String discoveryLocalBindAddress)
+ {
+ if (ConnectionFactoryProperties.trace)
+ {
+ ConnectionFactoryProperties.log.trace("setDiscoveryLocalBindAddress(" + discoveryLocalBindAddress + ")");
+ }
+ hasBeenUpdated = true;
+ this.discoveryLocalBindAddress = discoveryLocalBindAddress;
+ }
+
+ public String getDiscoveryLocalBindAddress()
+ {
+ if (ConnectionFactoryProperties.trace)
+ {
+ ConnectionFactoryProperties.log.trace("getDiscoveryLocalBindAddress()");
+ }
+ return discoveryLocalBindAddress;
+ }
+
public void setDiscoveryPort(final Integer discoveryPort)
{
if (ConnectionFactoryProperties.trace)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 11:13:56 UTC (rev 12071)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 11:21:52 UTC (rev 12072)
@@ -42,7 +42,6 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.recovery.XARecoveryConfig;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.ra.recovery.RecoveryManager;
@@ -371,6 +370,36 @@
}
/**
+ * set the discovery local bind address
+ *
+ * @param discoveryLocalBindAddress the address value
+ */
+ public void setDiscoveryLocalBindAddress(final String discoveryLocalBindAddress)
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("setDiscoveryLocalBindAddress(" + discoveryLocalBindAddress + ")");
+ }
+
+ raProperties.setDiscoveryLocalBindAddress(discoveryLocalBindAddress);
+ }
+
+ /**
+ * get the discovery local bind address
+ *
+ * @return the address value
+ */
+ public String getDiscoveryLocalBindAddress()
+ {
+ if (HornetQResourceAdapter.trace)
+ {
+ HornetQResourceAdapter.log.trace("getDiscoveryLocalBindAddress()");
+ }
+
+ return raProperties.getDiscoveryLocalBindAddress();
+ }
+
+ /**
* Set the discovery group port
*
* @param dgp The value
@@ -1463,7 +1492,7 @@
{
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
: getDiscoveryPort();
-
+
if(discoveryPort == null)
{
discoveryPort = HornetQClient.DEFAULT_DISCOVERY_PORT;
@@ -1495,6 +1524,11 @@
groupConfiguration.setRefreshTimeout(refreshTimeout);
+ String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress()
+ : raProperties.getDiscoveryLocalBindAddress();
+
+ groupConfiguration.setLocalBindAdress(localBindAddress);
+
if (ha)
{
cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
@@ -1603,6 +1637,11 @@
groupConfiguration.setRefreshTimeout(refreshTimeout);
+ String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress()
+ : raProperties.getDiscoveryLocalBindAddress();
+
+ groupConfiguration.setLocalBindAdress(localBindAddress);
+
cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
else
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2012-02-03 11:13:56 UTC (rev 12071)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2012-02-03 11:21:52 UTC (rev 12072)
@@ -101,6 +101,12 @@
" </config-property>\n" +
" <config-property>\n" +
" <description>The discovery group address</description>\n" +
+ " <config-property-name>DiscoveryLocalBindAddress</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The discovery group local bind address</description>\n" +
" <config-property-name>DiscoveryAddress</config-property-name>\n" +
" <config-property-type>java.lang.String</config-property-type>\n" +
" <config-property-value></config-property-value>\n" +
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2012-02-03 11:13:56 UTC (rev 12071)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/ResourceAdapterTest.java 2012-02-03 11:21:52 UTC (rev 12072)
@@ -263,9 +263,13 @@
ConnectionFactoryProperties connectionFactoryProperties = new ConnectionFactoryProperties();
connectionFactoryProperties.setDiscoveryAddress("myhost");
connectionFactoryProperties.setDiscoveryPort(5678);
+ connectionFactoryProperties.setDiscoveryLocalBindAddress("newAddress");
HornetQConnectionFactory factory = ra.createHornetQConnectionFactory(connectionFactoryProperties);
HornetQConnectionFactory defaultFactory = ra.getDefaultHornetQConnectionFactory();
Assert.assertNotSame(factory, defaultFactory);
+ Assert.assertEquals(factory.getDiscoveryGroupConfiguration().getLocalBindAddress(), "newAddress");
+ Assert.assertEquals(factory.getDiscoveryGroupConfiguration().getGroupAddress(), "myhost");
+ Assert.assertEquals(factory.getDiscoveryGroupConfiguration().getGroupPort(), 5678);
}
public void testCreateConnectionFactoryMultipleConnectors()
12 years, 11 months
JBoss hornetq SVN: r12071 - in trunk: hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-03 06:13:56 -0500 (Fri, 03 Feb 2012)
New Revision: 12071
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
Log:
https://issues.jboss.org/browse/HORNETQ-828 - fix blocking issue with ra and recovery
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2012-02-03 10:55:59 UTC (rev 12070)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2012-02-03 11:13:56 UTC (rev 12071)
@@ -385,6 +385,10 @@
throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
}
+ if(failingOver)
+ {
+ unlock();
+ }
closed = true;
}
Modified: trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
--- trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2012-02-03 10:55:59 UTC (rev 12070)
+++ trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2012-02-03 11:13:56 UTC (rev 12071)
@@ -76,6 +76,12 @@
*/
private HornetQConnectionFactory connectionFactory;
+
+ /**
+ * Connection Factory used if properties are set
+ */
+ private HornetQConnectionFactory recoveryConnectionFactory;
+
/*
* The resource recovery if there is one
* */
@@ -137,6 +143,12 @@
cm);
}
+ if (connectionFactory == null)
+ {
+ connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
+ recoveryConnectionFactory = ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory, null, null);
+ }
return cf;
}
@@ -308,6 +320,7 @@
}
this.ra = (HornetQResourceAdapter)ra;
+ this.ra.setManagedConnectionFactory(this);
}
/**
@@ -752,7 +765,8 @@
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
- resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
+ recoveryConnectionFactory = ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory, null, null);
}
return connectionFactory;
}
@@ -804,5 +818,17 @@
{
ra.getRecoveryManager().unRegister(resourceRecovery);
}
+
+ if(connectionFactory != null)
+ {
+ connectionFactory.close();
+ connectionFactory = null;
+ }
+
+ if(recoveryConnectionFactory != null)
+ {
+ recoveryConnectionFactory.close();
+ recoveryConnectionFactory = null;
+ }
}
}
Modified: trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 10:55:59 UTC (rev 12070)
+++ trunk/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 11:13:56 UTC (rev 12071)
@@ -13,10 +13,7 @@
package org.hornetq.ra;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -103,13 +100,17 @@
private final Map<ActivationSpec, HornetQActivation> activations;
private HornetQConnectionFactory defaultHornetQConnectionFactory;
-
+
+ private HornetQConnectionFactory recoveryHornetQConnectionFactory;
+
private TransactionManager tm;
private String unparsedJndiParams;
- RecoveryManager recoveryManager;
+ private RecoveryManager recoveryManager;
+ private final List<HornetQRAManagedConnectionFactory> managedConnectionFactories = new ArrayList<HornetQRAManagedConnectionFactory>();
+
/**
* Constructor
*/
@@ -245,11 +246,21 @@
activations.clear();
+ for (HornetQRAManagedConnectionFactory managedConnectionFactory : managedConnectionFactories)
+ {
+ managedConnectionFactory.stop();
+ }
+
+ managedConnectionFactories.clear();
+
if (defaultHornetQConnectionFactory != null)
{
defaultHornetQConnectionFactory.close();
+ }
- XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
+ if(recoveryHornetQConnectionFactory != null)
+ {
+ recoveryHornetQConnectionFactory.close();
}
recoveryManager.stop();
@@ -1356,7 +1367,8 @@
protected void setup() throws HornetQException
{
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
- recoveryManager.register(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
+ recoveryHornetQConnectionFactory = createRecoveryHornetQConnectionFactory(raProperties);
+ recoveryManager.register(recoveryHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
public Map<ActivationSpec, HornetQActivation> getActivations()
@@ -1530,6 +1542,106 @@
return cf;
}
+ public HornetQConnectionFactory createRecoveryHornetQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
+ {
+ HornetQConnectionFactory cf;
+ List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames()
+ : raProperties.getParsedConnectorClassNames();
+
+ String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress()
+ : getDiscoveryAddress();
+
+ if (discoveryAddress != null)
+ {
+ Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
+ : getDiscoveryPort();
+
+ if(discoveryPort == null)
+ {
+ discoveryPort = HornetQClient.DEFAULT_DISCOVERY_PORT;
+ }
+
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Creating Recovery Connection Factory on the resource adapter for discovery=" + groupConfiguration);
+ }
+
+ Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
+ : raProperties.getDiscoveryRefreshTimeout();
+ if (refreshTimeout == null)
+ {
+ refreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+ }
+
+ Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout()
+ : raProperties.getDiscoveryInitialWaitTimeout();
+
+ if(initialTimeout == null)
+ {
+ initialTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+ }
+
+ groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+
+ groupConfiguration.setRefreshTimeout(refreshTimeout);
+
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+ }
+ else
+ if (connectorClassName != null)
+ {
+ TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
+
+ List<Map<String, Object>> connectionParams;
+ if(overrideProperties.getParsedConnectorClassNames() != null)
+ {
+ connectionParams = overrideProperties.getParsedConnectionParameters();
+ }
+ else
+ {
+ connectionParams = raProperties.getParsedConnectionParameters();
+ }
+
+ for (int i = 0; i < connectorClassName.size(); i++)
+ {
+ TransportConfiguration tc;
+ if(connectionParams == null || i >= connectionParams.size())
+ {
+ tc = new TransportConfiguration(connectorClassName.get(i));
+ log.debug("No connector params provided using default");
+ }
+ else
+ {
+ tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
+ }
+
+ transportConfigurations[i] = tc;
+ }
+
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Creating Recovery Connection Factory on the resource adapter for transport=" + transportConfigurations);
+ }
+
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
+
+ }
+ else
+ {
+ throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for HornetQ ResourceAdapter Connection Factory");
+ }
+ setParams(cf, overrideProperties);
+
+ //now make sure we are HA in any way
+
+ cf.setReconnectAttempts(0);
+ cf.setInitialConnectAttempts(0);
+ return cf;
+ }
+
public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
final Map<String, Object> overrideConnectionParams)
{
@@ -1723,4 +1835,10 @@
cf.setConnectionLoadBalancingPolicyClassName(val5);
}
}
+
+
+ public void setManagedConnectionFactory(HornetQRAManagedConnectionFactory hornetQRAManagedConnectionFactory)
+ {
+ managedConnectionFactories.add(hornetQRAManagedConnectionFactory);
+ }
}
12 years, 11 months
JBoss hornetq SVN: r12070 - in branches/Branch_2_2_EAP/src/main/org/hornetq: ra and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-03 05:55:59 -0500 (Fri, 03 Feb 2012)
New Revision: 12070
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
https://issues.jboss.org/browse/JBPAPP-8038
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2012-02-03 10:25:34 UTC (rev 12069)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2012-02-03 10:55:59 UTC (rev 12070)
@@ -349,6 +349,10 @@
throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
}
+ if(failingOver)
+ {
+ unlock();
+ }
closed = true;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2012-02-03 10:25:34 UTC (rev 12069)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2012-02-03 10:55:59 UTC (rev 12070)
@@ -77,6 +77,12 @@
*/
private HornetQConnectionFactory connectionFactory;
+
+ /**
+ * Connection Factory used if properties are set
+ */
+ private HornetQConnectionFactory recoveryConnectionFactory;
+
/*
* The resource recovery if there is one
* */
@@ -141,7 +147,8 @@
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
- resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
+ recoveryConnectionFactory = ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory, null, null);
}
return cf;
}
@@ -314,6 +321,7 @@
}
this.ra = (HornetQResourceAdapter)ra;
+ this.ra.setManagedConnectionFactory(this);
}
/**
@@ -758,7 +766,8 @@
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
- resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
+ recoveryConnectionFactory = ra.createRecoveryHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(recoveryConnectionFactory, null, null);
}
return connectionFactory;
}
@@ -810,5 +819,17 @@
{
ra.getRecoveryManager().unRegister(resourceRecovery);
}
+
+ if(connectionFactory != null)
+ {
+ connectionFactory.close();
+ connectionFactory = null;
+ }
+
+ if(recoveryConnectionFactory != null)
+ {
+ recoveryConnectionFactory.close();
+ recoveryConnectionFactory = null;
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 10:25:34 UTC (rev 12069)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2012-02-03 10:55:59 UTC (rev 12070)
@@ -13,6 +13,7 @@
package org.hornetq.ra;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
@@ -103,13 +104,17 @@
private final Map<ActivationSpec, HornetQActivation> activations;
private HornetQConnectionFactory defaultHornetQConnectionFactory;
+
+ private HornetQConnectionFactory recoveryHornetQConnectionFactory;
private TransactionManager tm;
private String unparsedJndiParams;
- RecoveryManager recoveryManager;
+ private RecoveryManager recoveryManager;
+ private final List<HornetQRAManagedConnectionFactory> managedConnectionFactories = new ArrayList<HornetQRAManagedConnectionFactory>();
+
/**
* Constructor
*/
@@ -240,7 +245,7 @@
{
if (HornetQResourceAdapter.trace)
{
- HornetQResourceAdapter.log.trace("stop()");
+ HornetQResourceAdapter.log.info("stop()*******************************************************************");
}
for (Map.Entry<ActivationSpec, HornetQActivation> entry : activations.entrySet())
@@ -257,11 +262,21 @@
activations.clear();
+ for (HornetQRAManagedConnectionFactory managedConnectionFactory : managedConnectionFactories)
+ {
+ managedConnectionFactory.stop();
+ }
+
+ managedConnectionFactories.clear();
+
if (defaultHornetQConnectionFactory != null)
{
defaultHornetQConnectionFactory.close();
+ }
- XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
+ if(recoveryHornetQConnectionFactory != null)
+ {
+ recoveryHornetQConnectionFactory.close();
}
recoveryManager.stop();
@@ -1368,7 +1383,8 @@
protected void setup() throws HornetQException
{
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
- recoveryManager.register(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
+ recoveryHornetQConnectionFactory = createRecoveryHornetQConnectionFactory(raProperties);
+ recoveryManager.register(recoveryHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
public Map<ActivationSpec, HornetQActivation> getActivations()
@@ -1542,6 +1558,106 @@
return cf;
}
+ public HornetQConnectionFactory createRecoveryHornetQConnectionFactory(final ConnectionFactoryProperties overrideProperties)
+ {
+ HornetQConnectionFactory cf;
+ List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames()
+ : raProperties.getParsedConnectorClassNames();
+
+ String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress()
+ : getDiscoveryAddress();
+
+ if (discoveryAddress != null)
+ {
+ Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
+ : getDiscoveryPort();
+
+ if(discoveryPort == null)
+ {
+ discoveryPort = HornetQClient.DEFAULT_DISCOVERY_PORT;
+ }
+
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Creating Recovery Connection Factory on the resource adapter for discovery=" + groupConfiguration);
+ }
+
+ Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
+ : raProperties.getDiscoveryRefreshTimeout();
+ if (refreshTimeout == null)
+ {
+ refreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+ }
+
+ Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout()
+ : raProperties.getDiscoveryInitialWaitTimeout();
+
+ if(initialTimeout == null)
+ {
+ initialTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+ }
+
+ groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+
+ groupConfiguration.setRefreshTimeout(refreshTimeout);
+
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+ }
+ else
+ if (connectorClassName != null)
+ {
+ TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
+
+ List<Map<String, Object>> connectionParams;
+ if(overrideProperties.getParsedConnectorClassNames() != null)
+ {
+ connectionParams = overrideProperties.getParsedConnectionParameters();
+ }
+ else
+ {
+ connectionParams = raProperties.getParsedConnectionParameters();
+ }
+
+ for (int i = 0; i < connectorClassName.size(); i++)
+ {
+ TransportConfiguration tc;
+ if(connectionParams == null || i >= connectionParams.size())
+ {
+ tc = new TransportConfiguration(connectorClassName.get(i));
+ log.debug("No connector params provided using default");
+ }
+ else
+ {
+ tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
+ }
+
+ transportConfigurations[i] = tc;
+ }
+
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Creating Recovery Connection Factory on the resource adapter for transport=" + transportConfigurations);
+ }
+
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
+
+ }
+ else
+ {
+ throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for HornetQ ResourceAdapter Connection Factory");
+ }
+ setParams(cf, overrideProperties);
+
+ //now make sure we are HA in any way
+
+ cf.setReconnectAttempts(0);
+ cf.setInitialConnectAttempts(0);
+ return cf;
+ }
+
public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
final Map<String, Object> overrideConnectionParams)
{
@@ -1735,4 +1851,9 @@
cf.setConnectionLoadBalancingPolicyClassName(val5);
}
}
+
+ public void setManagedConnectionFactory(HornetQRAManagedConnectionFactory hornetQRAManagedConnectionFactory)
+ {
+ managedConnectionFactories.add(hornetQRAManagedConnectionFactory);
+ }
}
12 years, 11 months
JBoss hornetq SVN: r12069 - tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-03 05:25:34 -0500 (Fri, 03 Feb 2012)
New Revision: 12069
Added:
tags/HornetQ_2_2_11_AS7_Final/
Log:
release for 2.2.11.Final
12 years, 11 months
JBoss hornetq SVN: r12068 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-01 11:50:29 -0500 (Wed, 01 Feb 2012)
New Revision: 12068
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Restore (newer) waitForBackup() code
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-01 16:50:09 UTC (rev 12067)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-01 16:50:29 UTC (rev 12068)
@@ -279,29 +279,24 @@
return sf;
}
- protected void waitForBackup(ClientSessionFactoryInternal sf, long seconds) throws Exception
- {
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (sf.getBackupConnector() == null)
- {
- try
- {
- Thread.sleep(100);
- } catch (InterruptedException e)
- {
- // ignore
- }
- if (sf.getBackupConnector() != null)
- {
- break;
- } else if (System.currentTimeMillis() > (time + toWait))
- {
- fail("backup server never started");
- }
- }
- System.out.println("sf.getBackupConnector() = " + sf.getBackupConnector());
- }
+ /**
+ * Waits for backup to be in the "started" state and to finish synchronization with its live.
+ * @param sessionFactory
+ * @param seconds
+ * @throws Exception
+ */
+ protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, int seconds) throws Exception
+ {
+ final HornetQServerImpl actualServer = (HornetQServerImpl)backupServer.getServer();
+ if (actualServer.getConfiguration().isSharedStore())
+ {
+ waitForServer(actualServer);
+ }
+ else
+ {
+ waitForRemoteBackup(sessionFactory, seconds, true, actualServer);
+ }
+ }
/**
* @param sessionFactory
12 years, 11 months
JBoss hornetq SVN: r12067 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration: cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-01 11:50:09 -0500 (Wed, 01 Feb 2012)
New Revision: 12067
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Improve tearDown so as to avoid hanging tests.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2012-02-01 16:49:50 UTC (rev 12066)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2012-02-01 16:50:09 UTC (rev 12067)
@@ -71,7 +71,7 @@
* A PagingTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
+ *
* Created Dec 5, 2008 8:25:58 PM
*
*
@@ -79,7 +79,8 @@
public class PagingTest extends ServiceTestBase
{
private ServerLocator locator;
-
+ private HornetQServer server;
+ private ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
public PagingTest(final String name)
@@ -126,8 +127,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
- config,
+ server =
+ createServer(true, config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
@@ -140,13 +141,13 @@
final int messagesPerTX = numberOfMessages / numberOfTX;
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -340,7 +341,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -353,7 +355,7 @@
final int numberOfMessages = 500;
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -361,7 +363,7 @@
locator.setProducerWindowSize(-1);
locator.setMinLargeMessageSize(1024 * 1024);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -427,7 +429,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -437,15 +440,13 @@
final int numberOfMessages = 1000;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -544,17 +545,6 @@
Thread.sleep(100);
}
assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
@@ -570,7 +560,7 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -580,17 +570,15 @@
final int numberOfMessages = 1000;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
- ClientSession session = sf.createSession(false, false, false);
+ ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -778,18 +766,6 @@
sessionConsumer.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testMissingTXEverythingAcked() throws Exception
@@ -800,7 +776,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -816,13 +793,13 @@
try
{
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -927,7 +904,7 @@
server.stop();
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -966,10 +943,6 @@
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
sess.close();
-
- locator.close();
-
- server.stop();
}
public void testMissingTXEverythingAcked2() throws Exception
@@ -980,7 +953,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -996,13 +970,13 @@
try
{
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1086,10 +1060,8 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -1129,13 +1101,6 @@
{
Thread.sleep(100);
}
-
- locator.close();
- }
- finally
- {
- server.stop();
- }
}
public void testTwoQueuesOneNoRouting() throws Exception
@@ -1148,7 +1113,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1158,15 +1124,13 @@
final int numberOfMessages = 1000;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1235,22 +1199,6 @@
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testSendReceivePagingPersistent() throws Exception
@@ -1281,7 +1229,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1336,6 +1285,7 @@
this.queue = queue;
}
+ @Override
public void run()
{
try
@@ -1362,13 +1312,13 @@
try
{
{
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1441,7 +1391,7 @@
tcount1.start();
tcount2.start();
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = createSessionFactory(locator);
final AtomicInteger errors = new AtomicInteger(0);
@@ -1455,6 +1405,7 @@
threads[start - 1] = new Thread()
{
+ @Override
public void run()
{
try
@@ -1578,7 +1529,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1597,16 +1549,14 @@
bb.put(getSamplebyte(j));
}
- try
{
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1652,13 +1602,14 @@
new HashMap<String, AddressSettings>());
server.start();
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = createSessionFactory(locator);
final AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -1734,20 +1685,8 @@
assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
- }
- finally
- {
- try
- {
- server.stop();
}
- catch (Throwable ignored)
- {
- }
- }
- }
-
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
{
@@ -1758,7 +1697,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1769,16 +1709,13 @@
final int numberOfIntegers = 256;
final int numberOfMessages = 1000;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1867,22 +1804,6 @@
consumer.close();
session.close();
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
private void assertBodiesEqual(final byte[] body, final HornetQBuffer buffer)
@@ -1900,7 +1821,7 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
*/
public void testDepageDuringTransaction() throws Exception
{
@@ -1908,7 +1829,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1916,14 +1838,12 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2015,22 +1935,6 @@
consumer.close();
session.close();
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
/**
@@ -2039,9 +1943,9 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
* Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
- *
+ *
*/
public void testDepageDuringTransaction2() throws Exception
{
@@ -2050,7 +1954,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2058,14 +1963,12 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
byte[] body = new byte[MESSAGE_SIZE];
@@ -2175,30 +2078,16 @@
session.close();
- sf.close();
-
- locator.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
public void testDepageDuringTransaction3() throws Exception
{
clearData();
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2206,14 +2095,12 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
byte[] body = new byte[MESSAGE_SIZE];
@@ -2304,23 +2191,7 @@
consumer.close();
- sessionNonTX.close();
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ sessionNonTX.close();
}
public void testDepageDuringTransaction4() throws Exception
@@ -2329,7 +2200,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2344,21 +2216,18 @@
final int numberOfMessages = 10000;
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(false);
+ sf = createSessionFactory(locator);
- try
- {
-
- final ClientSessionFactory sf = createSessionFactory(locator);
-
final byte[] body = new byte[MESSAGE_SIZE];
Thread producerThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionProducer = null;
@@ -2439,17 +2308,6 @@
sf.close();
assertEquals(0, errors.get());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
public void testOrderingNonTX() throws Exception
@@ -2458,7 +2316,8 @@
Configuration config = createDefaultConfig();
- final HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_SIZE * 2,
@@ -2473,12 +2332,10 @@
final int numberOfMessages = 2000;
- try
- {
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
final CountDownLatch ready = new CountDownLatch(1);
@@ -2486,6 +2343,7 @@
Thread producerThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionProducer = null;
@@ -2568,18 +2426,6 @@
producerThread.join();
assertEquals(0, errors.get());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testPageOnSchedulingNoRestart() throws Exception
@@ -2600,7 +2446,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2612,14 +2459,11 @@
final int numberOfBytes = 1024;
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2713,18 +2557,6 @@
consumer.close();
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testRollbackOnSend() throws Exception
@@ -2733,7 +2565,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2745,14 +2578,11 @@
final int numberOfMessages = 10;
- try
- {
-
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2786,18 +2616,6 @@
Assert.assertNull(consumer.receiveImmediate());
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testCommitOnSend() throws Exception
@@ -2806,7 +2624,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2818,14 +2637,11 @@
final int numberOfMessages = 500;
- try
- {
-
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2889,18 +2705,6 @@
}
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testParialConsume() throws Exception
@@ -2909,7 +2713,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2919,14 +2724,11 @@
final int numberOfMessages = 1000;
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3019,19 +2821,7 @@
session.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
public void testPageMultipleDestinations() throws Exception
{
internalTestPageMultipleDestinations(false);
@@ -3055,20 +2845,17 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 10 * 1024, settings);
+ server = createServer(true, config, 1024, 10 * 1024, settings);
server.start();
final int numberOfMessages = 1000;
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3169,19 +2956,6 @@
.getPagingManager()
.getPageStore(PagingTest.ADDRESS)
.getAddressSize());
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testDropMessagesExpiring() throws Exception
@@ -3197,18 +2971,15 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ server = createServer(true, config, 1024, 1024 * 1024, settings);
server.start();
final int numberOfMessages = 30000;
- try
- {
-
locator.setAckBatchSize(0);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession();
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3269,18 +3040,6 @@
}
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
private void internalTestPageMultipleDestinations(final boolean transacted) throws Exception
@@ -3291,22 +3050,19 @@
int NUMBER_OF_MESSAGES = 2;
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
-
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, !transacted, true, false, 0);
for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
@@ -3379,26 +3135,14 @@
Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getMessageCount());
Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getDeliveringCount());
}
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testSyncPage() throws Exception
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3498,7 +3242,8 @@
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3506,9 +3251,7 @@
server.start();
- try
- {
- server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+ server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);
@@ -3575,21 +3318,6 @@
assertTrue(pageUp.await(10, TimeUnit.SECONDS));
assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-
- server.stop();
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testPagingOneDestinationOnly() throws Exception
@@ -3609,13 +3337,11 @@
addresses.put(PAGED_ADDRESS.toString(), pagedDestination);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
- try
- {
- server.start();
+ server.start();
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, false);
@@ -3682,15 +3408,6 @@
Assert.assertNull(consumerPaged.receiveImmediate());
session.close();
-
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testPagingDifferentSizes() throws Exception
@@ -3718,14 +3435,11 @@
addresses.put(PAGED_ADDRESS_B.toString(), pagedDestinationB);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
+ server.start();
- try
- {
- server.start();
+ sf = createSessionFactory(locator);
- ClientSessionFactory sf = createSessionFactory(locator);
-
ClientSession session = sf.createSession(false, true, false);
session.createQueue(PAGED_ADDRESS_A, PAGED_ADDRESS_A, true);
@@ -3813,15 +3527,6 @@
consumerB.close();
session.close();
-
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testPageAndDepageRapidly() throws Exception
@@ -3835,7 +3540,7 @@
config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(10 * 1024 * 1024);
- HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
+ server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
server.start();
@@ -3843,15 +3548,13 @@
final int numberOfMessages = 200;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(true, true);
@@ -3863,6 +3566,7 @@
Thread consumeThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionConsumer = null;
@@ -3947,24 +3651,8 @@
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
}
- catch (Throwable ignored)
- {
- }
- }
- }
-
public void testTwoQueuesDifferentFilters() throws Exception
{
boolean persistentMessages = true;
@@ -3975,7 +3663,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3984,11 +3673,8 @@
server.start();
final int numberOfMessages = 200;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -3997,7 +3683,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4071,21 +3757,6 @@
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
public void testTwoQueues() throws Exception
@@ -4098,7 +3769,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -4107,11 +3779,8 @@
server.start();
final int numberOfMessages = 1000;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -4120,7 +3789,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4191,21 +3860,6 @@
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
public void testTwoQueuesConsumeOneRestart() throws Exception
@@ -4218,7 +3872,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -4227,11 +3882,8 @@
server.start();
final int numberOfMessages = 1000;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -4240,7 +3892,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4298,9 +3950,9 @@
assertNull(consumer.receiveImmediate());
consumer.close();
-
+
long timeout = System.currentTimeMillis() + 10000;
-
+
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
// It's async, so need to wait a bit for it happening
@@ -4317,22 +3969,7 @@
server.stop();
server.start();
-
- sf.close();
-
- locator.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
public void testDLAOnLargeMessageAndPaging() throws Exception
{
@@ -4349,12 +3986,10 @@
dla.setDeadLetterAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
server.start();
- ServerLocator locator = null;
- ClientSessionFactory sf = null;
ClientSession session = null;
try
{
@@ -4564,15 +4199,6 @@
finally
{
session.close();
- sf.close();
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
}
}
@@ -4592,21 +4218,19 @@
dla.setExpiryAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
server.start();
final int MESSAGE_SIZE = 20;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4718,19 +4342,7 @@
assertFalse(pgStoreAddress.isPaging());
- session.close();
- }
- finally
- {
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
+ session.close();
}
// Package protected ---------------------------------------------
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2012-02-01 16:49:50 UTC (rev 12066)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2012-02-01 16:50:09 UTC (rev 12067)
@@ -46,6 +46,7 @@
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.CountDownSessionFailureListener;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -94,7 +95,7 @@
}
protected ClientSession createSession(ClientSessionFactory sf,
- boolean autoCommitSends,
+ boolean autoCommitSends,
boolean autoCommitAcks,
int ackBatchSize) throws Exception
{
@@ -116,7 +117,7 @@
boolean autoCommitSends,
boolean autoCommitAcks) throws Exception
{
- return sf.createSession(xa, autoCommitSends, autoCommitAcks);
+ return addClientSession(sf.createSession(xa, autoCommitSends, autoCommitAcks));
}
// https://issues.jboss.org/browse/HORNETQ-685
@@ -263,7 +264,7 @@
session.close();
}
-
+
public void testTimeoutOnFailoverConsumeBlocked() throws Exception
{
locator.setCallTimeout(5000);
@@ -303,6 +304,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
ClientMessage message = null;
@@ -333,7 +335,7 @@
{
endLatch.countDown();
}
-
+
if (message.getBooleanProperty("end"))
{
break;
@@ -346,7 +348,7 @@
}
}
-
+
private ClientMessage getMessage()
{
while (true)
@@ -482,7 +484,7 @@
locator.setAckBatchSize(0);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -490,8 +492,6 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -544,68 +544,35 @@
System.out.println("received.size() = " + received.size());
session.close();
- sf.close();
-
Assert.assertTrue(retry <= 5);
+ }
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void createClientSessionFactory() throws Exception
+ {
+ sf = (ClientSessionFactoryInternal)createSessionFactory(locator);
}
public void testNonTransacted() throws Exception
{
- ClientSessionFactoryInternal sf;
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.close();
sf.close();
@@ -615,14 +582,19 @@
Assert.assertEquals(0, sf.numConnections());
}
- public void testConsumeTransacted() throws Exception
+ private void createSessionFactory() throws Exception
{
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ }
+ public void testConsumeTransacted() throws Exception
+ {
+ createSessionFactory();
+
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -631,17 +603,8 @@
final int numMessages = 10;
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(true);
+ sendMessages(session, producer, numMessages);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.commit();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -651,6 +614,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
+ assertNotNull("Just crashed? " + (i == 6) + " " + i, message);
message.acknowledge();
@@ -683,7 +647,7 @@
{
ClientMessage message = consumer.receive(1000);
- assertNotNull(message);
+ assertNotNull("Expecting message #" + i, message);
message.acknowledge();
}
@@ -691,12 +655,6 @@
session.commit();
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
// https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -707,7 +665,7 @@
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
// Crash live server
crash();
@@ -718,72 +676,30 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(true);
+ sendMessages(session, producer, NUM_MESSAGES);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesSentSoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -805,15 +721,9 @@
ClientMessage message = consumer.receiveImmediate();
- Assert.assertNull(message);
+ Assert.assertNull("message should be null! Was: " + message, message);
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
/**
@@ -822,31 +732,16 @@
*/
public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -879,45 +774,24 @@
message = consumer.receiveImmediate();
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting a message", message);
Assert.assertEquals(counter, message.getIntProperty("counter").intValue());
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.commit();
crash(session);
@@ -932,45 +806,19 @@
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -982,19 +830,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
// messages will be delivered to the consumer when the session is committed
session.commit();
@@ -1012,64 +849,25 @@
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesConsumedSoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, false, false);
@@ -1078,19 +876,8 @@
session2.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
Assert.assertTrue(session2.isRollbackOnly());
@@ -1105,34 +892,20 @@
{
Assert.assertEquals(HornetQException.TRANSACTION_ROLLED_BACK, e.getCode());
}
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session1.createMessage(true);
@@ -1179,7 +952,7 @@
{
ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting message " + i, message);
assertMessageBody(i, message);
@@ -1191,26 +964,12 @@
session2.commit();
Assert.assertNull(consumer.receiveImmediate());
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1219,21 +978,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
try
@@ -1254,47 +1004,26 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ final ClientSession session = createSession(sf, true, false, false);
- ClientSession session = createSession(sf, true, false, false);
-
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -1308,6 +1037,7 @@
catch (XAException e)
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
+ // XXXX session.rollback();
}
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1318,24 +1048,15 @@
Assert.assertNull(message);
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ producer.close();
+ consumer.close();
}
// This might happen if 1PC optimisation kicks in
public void testXAMessagesSentSoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1344,21 +1065,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -1381,24 +1093,12 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1407,21 +1107,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
@@ -1438,66 +1129,27 @@
session.start(xid2, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.end(xid2, XAResource.TMSUCCESS);
session.prepare(xid2);
session.commit(xid2, false);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1510,19 +1162,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
try
@@ -1535,45 +1176,20 @@
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1586,19 +1202,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
crash(session2);
@@ -1613,45 +1218,20 @@
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
// 1PC optimisation
public void testXAMessagesConsumedSoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
-
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1664,19 +1244,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
// session2.prepare(xid);
@@ -1698,12 +1267,6 @@
session1.close();
session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testCreateNewFactoryAfterFailover() throws Exception
@@ -1711,7 +1274,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sendAndConsume(sf, true);
@@ -1721,27 +1284,15 @@
Thread.sleep(5000);
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = sendAndConsume(sf, true);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testFailoverMultipleSessionsWithConsumers() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
final int numSessions = 5;
final int numConsumersPerSession = 5;
@@ -1772,19 +1323,9 @@
ClientProducer producer = sendSession.createProducer(FailoverTestBase.ADDRESS);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = sendSession.createMessage(true);
+ sendMessages(sendSession, producer, NUM_MESSAGES);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
Set<ClientSession> sessionSet = sessionConsumerMap.keySet();
ClientSession[] sessions = new ClientSession[sessionSet.size()];
sessionSet.toArray(sessions);
@@ -1799,18 +1340,7 @@
{
for (ClientConsumer consumer : consumerList)
{
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
+ receiveMessages(consumer);
}
}
@@ -1818,14 +1348,6 @@
{
session.close();
}
-
- sendSession.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
/*
@@ -1833,105 +1355,56 @@
*/
public void testFailWithBrowser() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
-
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer, 0, NUM_MESSAGES, false);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- }
-
crash(session);
+ receiveDurableMessages(consumer);
+ }
+
+ private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer) throws Exception
+ {
for (int i = 0; i < NUM_MESSAGES; i++)
{
- // Only the persistent messages will survive
-
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
+ // some are durable, some are not!
+ boolean durable = isDurable(i);
+ ClientMessage message = session.createMessage(durable);
+ setBody(i, message);
+ message.putIntProperty("counter", i);
+ producer.send(message);
}
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
+ // Receive MSGs but don't ack!
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1947,31 +1420,29 @@
// Should get the same ones after failover since we didn't ack
+ receiveDurableMessages(consumer);
+ }
+
+ private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
+ {
for (int i = 0; i < NUM_MESSAGES; i++)
{
// Only the persistent messages will survive
- if (i % 2 == 0)
+ if (isDurable(i))
{
ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
+ Assert.assertNotNull("expecting durable msg " + i, message);
assertMessageBody(i, message);
-
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
message.acknowledge();
}
}
+ }
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private boolean isDurable(int i)
+ {
+ return i % 2 == 0;
}
public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
@@ -1980,7 +1451,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = createSession(sf, true, true, 0);
@@ -1988,43 +1459,21 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session);
// Send some more
for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
{
- ClientMessage message = session.createMessage(i % 2 == 0);
+ ClientMessage message = session.createMessage(isDurable(i));
setBody(i, message);
@@ -2034,27 +1483,12 @@
}
// Should get the same ones after failover since we didn't ack
+ receiveMessages(consumer, NUM_MESSAGES, NUM_MESSAGES * 2, true);
+ }
- for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void receiveMessages(ClientConsumer consumer) throws HornetQException
+ {
+ receiveMessages(consumer, 0, NUM_MESSAGES, true);
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -2083,7 +1517,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = createSession(sf, true, true, 0);
@@ -2098,45 +1532,15 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
crash(session);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ receiveMessages(consumer);
}
public void _testForceBlockingReturn() throws Exception
@@ -2145,7 +1549,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
// Add an interceptor to delay the send method so we can get time to cause failover before it returns
@@ -2194,12 +1598,6 @@
Assert.assertEquals(sender.e.getCode(), HornetQException.UNBLOCKED);
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -2209,14 +1607,15 @@
locator.setReconnectAttempts(-1);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
final ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-
+
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
String txID = "my-tx-id";
@@ -2267,7 +1666,7 @@
}
catch (HornetQException e2)
{
-
+ throw new RuntimeException(e2);
}
}
@@ -2279,8 +1678,8 @@
Committer committer = new Committer();
- // Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
- // with transaction rolled back
+ // Commit will occur, but response will never get back, connection is failed, and commit
+ // should be unblocked with transaction rolled back
committer.start();
@@ -2293,7 +1692,7 @@
committer.join();
- Assert.assertFalse(committer.failed);
+ Assert.assertFalse("second attempt succeed?", committer.failed);
session.close();
@@ -2324,40 +1723,22 @@
try
{
session2.commit();
+ fail("expecting DUPLICATE_ID_REJECTED exception");
}
catch (HornetQException e)
{
- assertEquals(HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
+ assertEquals(e.getMessage(), HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
}
ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
session2.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testCommitDidNotOccurUnblockedAndResend() throws Exception
@@ -2366,27 +1747,17 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-
+
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ sendMessages(session, producer,NUM_MESSAGES);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(true);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
class Committer extends Thread
{
@Override
@@ -2443,75 +1814,36 @@
producer = session2.createProducer(FailoverTestBase.ADDRESS);
// We now try and resend the messages since we get a transaction rolled back exception
+ sendMessages(session2, producer,NUM_MESSAGES);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session2.createMessage(true);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session2.commit();
ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
session2.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
- Assert.assertNull(message);
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ Assert.assertNull("expecting null message", message);
}
public void testBackupServerNotRemoved() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener implements SessionFailureListener
+ // HORNETQ-720 Disabling test for replicating backups.
+ if (!backupServer.getServer().getConfiguration().isSharedStore())
{
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(HornetQException exception)
- {
- System.out.println("MyListener.beforeReconnect");
- }
+ waitForComponent(backupServer, 1);
+ return;
}
+ locator.setFailoverOnInitialConnection(true);
+ createSessionFactory();
+ CountDownSessionFailureListener listener = new CountDownSessionFailureListener();
ClientSession session = sendAndConsume(sf, true);
- session.addFailureListener(new MyListener());
+ session.addFailureListener(listener);
backupServer.stop();
@@ -2522,7 +1854,7 @@
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", listener.getLatch().await(5, TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -2531,23 +1863,12 @@
setBody(0, message);
producer.send(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testLiveAndBackupLiveComesBack() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -2587,23 +1908,13 @@
setBody(0, message);
producer.send(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
+
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -2646,7 +1957,7 @@
sf.close();
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = createSession(sf);
@@ -2659,14 +1970,6 @@
assertNotNull(cm);
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -2675,25 +1978,12 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
- final CountDownLatch latch = new CountDownLatch(1);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ CountDownSessionFailureListener listener = new CountDownSessionFailureListener();
- class MyListener implements SessionFailureListener
- {
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(HornetQException exception)
- {
- System.out.println("MyListener.beforeReconnect");
- }
- }
-
ClientSession session = sendAndConsume(sf, true);
- session.addFailureListener(new MyListener());
+ session.addFailureListener(listener);
backupServer.stop();
@@ -2702,9 +1992,16 @@
// To reload security or other settings that are read during startup
beforeRestart(backupServer);
+ if (!backupServer.getServer().getConfiguration().isSharedStore())
+ {
+ // XXX
+ // this test would not make sense in the remote replication use case, without the following
+ backupServer.getServer().getConfiguration().setBackup(false);
+ }
+
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", listener.getLatch().await(5, TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -2718,7 +2015,7 @@
sf.close();
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = createSession(sf);
@@ -2731,14 +2028,6 @@
assertNotNull(cm);
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
// Package protected ---------------------------------------------
@@ -2757,25 +2046,6 @@
return TransportConfigurationUtils.getInVMConnector(live);
}
- /**
- * @param i
- * @param message
- */
- protected void assertMessageBody(final int i, final ClientMessage message)
- {
- Assert.assertEquals("message" + i, message.getBodyBuffer().readString());
- }
-
- /**
- * @param i
- * @param message
- * @throws Exception
- */
- protected void setBody(final int i, final ClientMessage message) throws Exception
- {
- message.getBodyBuffer().writeString("message" + i);
- }
-
protected void beforeRestart(TestableServer liveServer)
{
}
12 years, 11 months
JBoss hornetq SVN: r12066 - trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-01 11:49:50 -0500 (Wed, 01 Feb 2012)
New Revision: 12066
Modified:
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
Log:
Restore correct documentation of journal format.
Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2012-02-01 16:49:32 UTC (rev 12065)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2012-02-01 16:49:50 UTC (rev 12066)
@@ -15,26 +15,23 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
/**
- * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
- * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
- * <p>The element-summary will then have</p>
- * <p>FileID1, 10</p>
- * <p>FileID2, 10</p>
- * <p>FileID3, 10</p>
- *
- * <br>
- * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
- * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
- * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
- *
+ * <p>
+ * A transaction record (Commit or Prepare), will hold the number of elements the transaction has in
+ * the current file.
+ * <p>
+ * While loading the {@link JournalFile}, the number of operations found is matched against this
+ * number. If for any reason there are missing operations, the transaction will be ignored.
+ * <p>
+ * We can't just use a global counter as reclaiming could delete files after the transaction was
+ * successfully committed. That also means not having a whole file on journal-reload doesn't mean we
+ * have to invalidate the transaction
+ * <p>
+ * The commit operation itself is not included in this total.
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public class JournalCompleteRecordTX extends JournalInternalRecord
{
@@ -70,7 +67,7 @@
}
buffer.writeInt(fileID);
-
+
buffer.writeByte(compactCount);
buffer.writeLong(txID);
12 years, 11 months