JBoss hornetq SVN: r12273 - in branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396: hornetq-rest and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-08 18:21:53 -0500 (Thu, 08 Mar 2012)
New Revision: 12273
Modified:
branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml
branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml
branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties
Log:
changing version names for JBPAPP-8396
Modified: branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml
===================================================================
--- branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml 2012-03-08 23:18:51 UTC (rev 12272)
+++ branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml 2012-03-08 23:21:53 UTC (rev 12273)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.10.EAP.GA"/>
+ <property name="hornetq.version" value="2.2.10.EAP.GA-JBPAPP_6277"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml
===================================================================
--- branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml 2012-03-08 23:18:51 UTC (rev 12272)
+++ branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml 2012-03-08 23:21:53 UTC (rev 12273)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.10.EAP.GA</hornetq.version>
+ <hornetq.version>2.2.10.EAP.GA-JBPAPP_6277</hornetq.version>
</properties>
<licenses>
Modified: branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties
===================================================================
--- branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties 2012-03-08 23:18:51 UTC (rev 12272)
+++ branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties 2012-03-08 23:21:53 UTC (rev 12273)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_10_EAP_GA
+hornetq.version.versionName=HQ_2_2_10_EAP_GA_JBPAPP_6277
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=10
12 years, 2 months
JBoss hornetq SVN: r12272 - branches/one-offs.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-08 18:18:51 -0500 (Thu, 08 Mar 2012)
New Revision: 12272
Added:
branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/
Log:
Creating branch for JBPAPP-8396
12 years, 2 months
JBoss hornetq SVN: r12271 - in branches/Branch_2_2_EAP/src/main/org/hornetq: jms/server/recovery and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-08 17:57:55 -0500 (Thu, 08 Mar 2012)
New Revision: 12271
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
JBPAPP-8377 - Improving recovery registry while avoiding duplicates
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -113,6 +113,18 @@
}
/**
+ * Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
+ * as the cluster topology changes, and no HA backup information is propagated to the client
+ *
+ * @param transportConfigurations
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, TransportConfiguration... transportConfigurations)
+ {
+ return new ServerLocatorImpl(ha, transportConfigurations);
+ }
+
+ /**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
* The UDP address and port are used to listen for live servers in the cluster
@@ -127,6 +139,20 @@
}
/**
+ * Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
+ *
+ * The UDP address and port are used to listen for live servers in the cluster
+ *
+ * @param discoveryAddress The UDP group address to listen for updates
+ * @param discoveryPort the UDP port to listen for updates
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ return new ServerLocatorImpl(ha, groupConfiguration);
+ }
+
+ /**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The initial list of servers supplied in this method is simply to make an initial connection to the cluster, once that connection is made, up to date
* cluster topology information is downloaded and automatically updated whenever the cluster topology changes. If the topology includes backup servers
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -13,14 +13,9 @@
package org.hornetq.jms.server.recovery;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.jms.client.HornetQConnectionFactory;
import org.jboss.tm.XAResourceRecoveryRegistry;
/**
@@ -28,7 +23,6 @@
* we verify if a given connection factory already have a recovery registered
*
* @author Clebert
- * @author Andy Taylor
*
*
*/
@@ -40,7 +34,7 @@
// Attributes ----------------------------------------------------
- private static Set<HornetQResourceRecovery> configSet = new HashSet<HornetQResourceRecovery>();
+ private static HashMap<XARecoveryConfig, HornetQResourceRecovery> configSet = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
// Static --------------------------------------------------------
@@ -54,32 +48,46 @@
{
synchronized (configSet)
{
- HornetQResourceRecovery usedInstance = locateSimilarResource(resourceRecovery);
- if (usedInstance == null)
+ HornetQResourceRecovery recovery = configSet.get(resourceRecovery.getConfig());
+
+ if (recovery == null)
{
+ recovery = resourceRecovery;
if (log.isDebugEnabled())
{
- log.debug("Adding " + resourceRecovery.getConfig() + " resource = " + resourceRecovery);
+ log.debug("Registering a new recovery for " + recovery.getConfig() + ", recovery = " + resourceRecovery);
}
- usedInstance = resourceRecovery;
- configSet.add(usedInstance);
- getTMRegistry().addXAResourceRecovery(usedInstance);
+ configSet.put(resourceRecovery.getConfig(), resourceRecovery);
+ getTMRegistry().addXAResourceRecovery(recovery);
}
- usedInstance.incrementUsage();
- return usedInstance;
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.info("Return pre-existent recovery=" + recovery + " for configuration = " + resourceRecovery.getConfig());
+ }
+ }
+ recovery.incrementUsage();
+ return recovery;
}
}
- public synchronized void unRegister(final HornetQResourceRecovery resourceRecovery)
+ public void unRegister(final HornetQResourceRecovery resourceRecovery)
{
synchronized (configSet)
{
- // The same resource could have been reused by more than one resource manager or factory
- if (resourceRecovery.decrementUsage() == 0)
+ HornetQResourceRecovery recFound = configSet.get(resourceRecovery.getConfig());
+
+ if (recFound != null && recFound.decrementUsage() == 0)
{
- getTMRegistry().removeXAResourceRecovery(resourceRecovery);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Removing recovery information for " + recFound + " as all the deployments were already removed");
+ }
+ getTMRegistry().removeXAResourceRecovery(recFound);
+ configSet.remove(resourceRecovery);
}
}
}
@@ -89,77 +97,6 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
- private static HornetQResourceRecovery locateSimilarResource(HornetQResourceRecovery resourceInput)
- {
- HornetQConnectionFactory factory = resourceInput.getConfig().getFactory();
-
- TransportConfiguration[] transportConfigurations = resourceInput.getConfig().getFactory().getServerLocator()
- .getStaticTransportConfigurations();
+ // Inner classes -------------------------------------------------
-
- if (log.isTraceEnabled())
- {
- log.trace("############################################## looking for a place on " + Arrays.toString(transportConfigurations));
- }
-
- for (HornetQResourceRecovery resourceScan : configSet)
- {
- XARecoveryConfig xaRecoveryConfig = resourceScan.getConfig();
-
- if (transportConfigurations != null)
- {
- TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator()
- .getStaticTransportConfigurations();
-
- if (log.isTraceEnabled())
- {
- log.trace("Checking " + Arrays.toString(transportConfigurations) + " against " + Arrays.toString(xaConfigurations));
- }
-
- if (xaConfigurations == null)
- {
- continue;
- }
- if (transportConfigurations.length != xaConfigurations.length)
- {
- if (log.isTraceEnabled())
- {
- log.trace(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of size");
- }
- continue;
- }
- boolean theSame = true;
- for (int i = 0; i < transportConfigurations.length; i++)
- {
- TransportConfiguration tc = transportConfigurations[i];
- TransportConfiguration xaTc = xaConfigurations[i];
- if (!tc.equals(xaTc))
- {
- log.info(Arrays.toString(transportConfigurations) + " != " + Arrays.toString(xaConfigurations) + " because of " + tc + " != " + xaTc);
- theSame = false;
- break;
- }
- }
- if (theSame)
- {
- return resourceScan;
- }
- } else
- {
- DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory()
- .getServerLocator().getDiscoveryGroupConfiguration();
- if (discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
- {
- return resourceScan;
- }
- }
- }
-
- return null;
-
- }
-
- // Inner classes -------------------------------------------------
-
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -316,7 +317,14 @@
try
{
- serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
+ if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
+ {
+ serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getDiscoveryConfiguration());
+ }
+ else
+ {
+ serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getTransportConfig());
+ }
serverLocator.disableFinalizeCheck();
csf = serverLocator.createSessionFactory();
if (xaRecoveryConfig.getUsername() == null)
@@ -334,10 +342,29 @@
1);
}
}
- catch (HornetQException e)
+ catch (Throwable e)
{
+ log.warn("Can't connect to " + xaRecoveryConfig + " on auto-generated resource recovery", e);
+ if (log.isDebugEnabled())
+ {
+ log.debug(e.getMessage(), e);
+ }
+
+ try
+ {
+ if (cs != null) cs.close();
+ if (serverLocator != null) serverLocator.close();
+ }
+ catch (Throwable ignored)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(e.getMessage(), ignored);
+ }
+ }
continue;
}
+
cs.addFailureListener(this);
synchronized (HornetQXAResourceWrapper.lock)
@@ -392,9 +419,9 @@
oldServerLocator.close();
}
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
- HornetQXAResourceWrapper.log.trace("Ignored error during close", ignored);
+ HornetQXAResourceWrapper.log.debug("Ignored error during close", ignored);
}
}
@@ -410,10 +437,9 @@
{
log.warn(e.getMessage(), e);
- if (e.errorCode == XAException.XA_RETRY)
- {
- close();
- }
+
+ // If any exception happened, we close the connection so we may start fresh
+ close();
throw e;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -13,6 +13,10 @@
package org.hornetq.jms.server.recovery;
+import java.util.Arrays;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
/**
@@ -27,22 +31,46 @@
*/
public class XARecoveryConfig
{
- private final HornetQConnectionFactory hornetQConnectionFactory;
+
+ private final boolean ha;
+ private final TransportConfiguration[] transportConfiguration;
+ private final DiscoveryGroupConfiguration discoveryConfiguration;
private final String username;
private final String password;
- public XARecoveryConfig(HornetQConnectionFactory hornetQConnectionFactory, String username, String password)
+ public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
{
- this.hornetQConnectionFactory = hornetQConnectionFactory;
+ this.transportConfiguration = transportConfiguration;
+ this.discoveryConfiguration = null;
this.username = username;
this.password = password;
+ this.ha = ha;
}
- public HornetQConnectionFactory getHornetQConnectionFactory()
+ public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
{
- return hornetQConnectionFactory;
+ this.discoveryConfiguration = discoveryConfiguration;
+ this.transportConfiguration = null;
+ this.username = username;
+ this.password = password;
+ this.ha = ha;
}
+
+ public boolean isHA()
+ {
+ return ha;
+ }
+ public DiscoveryGroupConfiguration getDiscoveryConfiguration()
+ {
+ return discoveryConfiguration;
+ }
+
+ public TransportConfiguration[] getTransportConfig()
+ {
+ return transportConfiguration;
+ }
+
public String getUsername()
{
return username;
@@ -53,28 +81,43 @@
return password;
}
- public HornetQConnectionFactory getFactory()
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
{
- return hornetQConnectionFactory;
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
+ result = prime * result + Arrays.hashCode(transportConfiguration);
+ return result;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
@Override
- public boolean equals(Object o)
+ public boolean equals(Object obj)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- XARecoveryConfig that = (XARecoveryConfig) o;
-
- if (hornetQConnectionFactory != null ? !hornetQConnectionFactory.equals(that.hornetQConnectionFactory) : that.hornetQConnectionFactory != null)
+ if (this == obj)
+ return true;
+ if (obj == null)
return false;
- if (password != null ? !password.equals(that.password) : that.password != null) return false;
- if (username != null ? !username.equals(that.username) : that.username != null) return false;
-
+ if (getClass() != obj.getClass())
+ return false;
+ XARecoveryConfig other = (XARecoveryConfig)obj;
+ if (discoveryConfiguration == null)
+ {
+ if (other.discoveryConfiguration != null)
+ return false;
+ }
+ else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
+ return false;
+ if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
+ return false;
return true;
}
-
-
/* (non-Javadoc)
* @see java.lang.Object#toString()
@@ -82,20 +125,12 @@
@Override
public String toString()
{
- return "XARecoveryConfig [hornetQConnectionFactory=" + hornetQConnectionFactory +
+ return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
+ ", discoveryConfiguration = " + discoveryConfiguration +
", username=" +
username +
", password=" +
password +
"]";
}
-
- @Override
- public int hashCode()
- {
- int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
- result = 31 * result + (username != null ? username.hashCode() : 0);
- result = 31 * result + (password != null ? password.hashCode() : 0);
- return result;
- }
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-08 16:08:38 UTC (rev 12270)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-08 22:57:55 UTC (rev 12271)
@@ -54,8 +54,7 @@
{
log.debug("registering recovery for factory : " + factory);
- XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
- HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ HornetQResourceRecovery resourceRecovery = newResourceRecovery(factory, userName, password);
if (registry != null)
{
@@ -69,6 +68,31 @@
return resourceRecovery;
}
+ /**
+ * @param factory
+ * @param userName
+ * @param password
+ * @return
+ */
+ private HornetQResourceRecovery newResourceRecovery(HornetQConnectionFactory factory,
+ String userName,
+ String password)
+ {
+ XARecoveryConfig xaRecoveryConfig;
+
+ if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
+ {
+ xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
+ }
+ else
+ {
+ xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
+ }
+
+ HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ return resourceRecovery;
+ }
+
public void unRegister(HornetQResourceRecovery resourceRecovery)
{
registry.unRegister(resourceRecovery);
12 years, 2 months
JBoss hornetq SVN: r12270 - in trunk: hornetq-commons/src/main/java/org/hornetq/core/logging and 14 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-08 11:08:38 -0500 (Thu, 08 Mar 2012)
New Revision: 12270
Modified:
trunk/hornetq-bootstrap/src/main/java/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
trunk/hornetq-commons/src/main/java/org/hornetq/core/logging/Logger.java
trunk/hornetq-commons/src/main/java/org/hornetq/utils/DataConstants.java
trunk/hornetq-commons/src/main/java/org/hornetq/utils/HornetQThreadFactory.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/MessageCounterInfo.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/TopologyMember.java
trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/Operator.java
trunk/hornetq-core/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
Log:
UCDetector: Reduce visibility of loads of fields/methods/classes
Modified: trunk/hornetq-bootstrap/src/main/java/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
===================================================================
--- trunk/hornetq-bootstrap/src/main/java/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-bootstrap/src/main/java/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -53,7 +53,7 @@
/**
* The arguments
*/
- protected String[] args;
+ private final String[] args;
private Properties properties;
@@ -77,7 +77,7 @@
/**
* Add a simple shutdown hook to stop the server.
*/
- public void addShutdownHook()
+ private void addShutdownHook()
{
String dirName = System.getProperty("hornetq.config.dir", ".");
final File file = new File(dirName + "/STOP_ME");
@@ -232,7 +232,7 @@
* @param url the deployment url
* @throws Throwable for any error
*/
- protected KernelDeployment deploy(final URL url) throws Throwable
+ private KernelDeployment deploy(final URL url) throws Throwable
{
HornetQBootstrapServer.log.debug("Deploying " + url);
KernelDeployment deployment = deployer.deploy(url);
@@ -271,7 +271,7 @@
properties = props;
}
- protected class Shutdown extends Thread
+ private final class Shutdown extends Thread
{
public Shutdown()
{
Modified: trunk/hornetq-commons/src/main/java/org/hornetq/core/logging/Logger.java
===================================================================
--- trunk/hornetq-commons/src/main/java/org/hornetq/core/logging/Logger.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-commons/src/main/java/org/hornetq/core/logging/Logger.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -24,11 +24,11 @@
import org.hornetq.utils.ClassloadingUtil;
/**
- *
+ *
* A Logger
- *
+ *
* This class allows us to isolate all our logging dependencies in one place
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
@@ -118,7 +118,7 @@
private final LogDelegate delegate;
- Logger(final LogDelegate delegate)
+ private Logger(final LogDelegate delegate)
{
this.delegate = delegate;
}
@@ -202,8 +202,8 @@
{
delegate.trace(message, t);
}
-
-
+
+
private static Object safeInitNewInstance(final String className)
{
return AccessController.doPrivileged(new PrivilegedAction<Object>()
Modified: trunk/hornetq-commons/src/main/java/org/hornetq/utils/DataConstants.java
===================================================================
--- trunk/hornetq-commons/src/main/java/org/hornetq/utils/DataConstants.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-commons/src/main/java/org/hornetq/utils/DataConstants.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -14,13 +14,13 @@
package org.hornetq.utils;
/**
- *
+ *
* A DataConstants
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
-public class DataConstants
+public final class DataConstants
{
public static final int SIZE_INT = 4;
@@ -36,7 +36,7 @@
public static final int SIZE_FLOAT = 4;
- public static final int SIZE_CHAR = 2;
+ static final int SIZE_CHAR = 2;
public static final byte TRUE = 1;
Modified: trunk/hornetq-commons/src/main/java/org/hornetq/utils/HornetQThreadFactory.java
===================================================================
--- trunk/hornetq-commons/src/main/java/org/hornetq/utils/HornetQThreadFactory.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-commons/src/main/java/org/hornetq/utils/HornetQThreadFactory.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -20,9 +20,9 @@
import org.hornetq.core.logging.Logger;
/**
- *
+ *
* A HornetQThreadFactory
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
@@ -37,22 +37,17 @@
private final int threadPriority;
private final boolean daemon;
-
+
private final ClassLoader tccl;
public HornetQThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl)
{
- this(groupName, Thread.NORM_PRIORITY, daemon, tccl);
- }
-
- public HornetQThreadFactory(final String groupName, final int threadPriority, final boolean daemon, final ClassLoader tccl)
- {
group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
-
- this.threadPriority = threadPriority;
+ this.threadPriority = Thread.NORM_PRIORITY;
+
this.tccl = tccl;
-
+
this.daemon = daemon;
}
@@ -69,7 +64,7 @@
{
t = new Thread(command, "Thread-" + threadCount.getAndIncrement());
}
-
+
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
public Object run()
@@ -79,7 +74,7 @@
return null;
}
});
-
+
try
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -29,27 +29,27 @@
// Attributes ----------------------------------------------------
- private String addressFullMessagePolicy;
+ private final String addressFullMessagePolicy;
- private long maxSizeBytes;
+ private final long maxSizeBytes;
- private int pageSizeBytes;
-
+ private final int pageSizeBytes;
+
private int pageCacheMaxSize;
- private int maxDeliveryAttempts;
+ private final int maxDeliveryAttempts;
- private long redeliveryDelay;
+ private final long redeliveryDelay;
- private String deadLetterAddress;
+ private final String deadLetterAddress;
- private String expiryAddress;
+ private final String expiryAddress;
- private boolean lastValueQueue;
+ private final boolean lastValueQueue;
- private long redistributionDelay;
+ private final long redistributionDelay;
- private boolean sendToDLAOnNoRoute;
+ private final boolean sendToDLAOnNoRoute;
// Static --------------------------------------------------------
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/MessageCounterInfo.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/MessageCounterInfo.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/MessageCounterInfo.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -23,7 +23,7 @@
/**
* Helper class to create Java Objects from the
* JSON serialization returned by {@link QueueControl#listMessageCounter()}.
- *
+ *
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
public class MessageCounterInfo
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -75,7 +75,7 @@
/**
* Returns the ObjectName used by AddressControl.
- *
+ *
* @see AddressControl
*/
public ObjectName getAddressObjectName(final SimpleString address) throws Exception
@@ -85,7 +85,7 @@
/**
* Returns the ObjectName used by QueueControl.
- *
+ *
* @see QueueControl
*/
public ObjectName getQueueObjectName(final SimpleString address, final SimpleString name) throws Exception
@@ -100,7 +100,7 @@
/**
* Returns the ObjectName used by DivertControl.
- *
+ *
* @see DivertControl
*/
public ObjectName getDivertObjectName(final String name) throws Exception
@@ -110,7 +110,7 @@
/**
* Returns the ObjectName used by AcceptorControl.
- *
+ *
* @see AcceptorControl
*/
public ObjectName getAcceptorObjectName(final String name) throws Exception
@@ -120,7 +120,7 @@
/**
* Returns the ObjectName used by BroadcastGroupControl.
- *
+ *
* @see BroadcastGroupControl
*/
public ObjectName getBroadcastGroupObjectName(final String name) throws Exception
@@ -130,7 +130,7 @@
/**
* Returns the ObjectName used by BridgeControl.
- *
+ *
* @see BridgeControl
*/
public ObjectName getBridgeObjectName(final String name) throws Exception
@@ -140,7 +140,7 @@
/**
* Returns the ObjectName used by ClusterConnectionControl.
- *
+ *
* @see ClusterConnectionControl
*/
public ObjectName getClusterConnectionObjectName(final String name) throws Exception
@@ -150,7 +150,7 @@
/**
* Returns the ObjectName used by DiscoveryGroupControl.
- *
+ *
* @see DiscoveryGroupControl
*/
public ObjectName getDiscoveryGroupObjectName(final String name) throws Exception
@@ -160,7 +160,7 @@
/**
* Returns the ObjectName used by JMSServerControl.
- *
+ *
* @see JMSServerControl
*/
public ObjectName getJMSServerObjectName() throws Exception
@@ -170,7 +170,7 @@
/**
* Returns the ObjectName used by JMSQueueControl.
- *
+ *
* @see JMSQueueControl
*/
public ObjectName getJMSQueueObjectName(final String name) throws Exception
@@ -180,7 +180,7 @@
/**
* Returns the ObjectName used by TopicControl.
- *
+ *
* @see TopicControl
*/
public ObjectName getJMSTopicObjectName(final String name) throws Exception
@@ -190,7 +190,7 @@
/**
* Returns the ObjectName used by ConnectionFactoryControl.
- *
+ *
* @see ConnectionFactoryControl
*/
public ObjectName getConnectionFactoryObjectName(final String name) throws Exception
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -49,14 +49,14 @@
{
// Constants
// ------------------------------------------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
- public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
+ private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
- public static final int NUM_PRIORITIES = 10;
+ private static final int NUM_PRIORITIES = 10;
public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
@@ -76,11 +76,11 @@
private final boolean browseOnly;
private final Executor sessionExecutor;
-
+
// For failover we can't send credits back
// while holding a lock or failover could dead lock eventually
// And we can't use the sessionExecutor as that's being used for message handlers
- // for that reason we have a separate flowControlExecutor that's using the thread pool
+ // for that reason we have a separate flowControlExecutor that's using the thread pool
// Which is a OrderedExecutor
private final Executor flowControlExecutor;
@@ -111,7 +111,7 @@
private volatile boolean closed;
private volatile int creditsToSend;
-
+
private volatile boolean failedOver;
private volatile Exception lastException;
@@ -127,13 +127,13 @@
private final SessionQueueQueryResponseMessage queueInfo;
private volatile boolean ackIndividually;
-
+
private final ClassLoader contextClassLoader;
// Constructors
// ---------------------------------------------------------------------------------
- public ClientConsumerImpl(final ClientSessionInternal session,
+ ClientConsumerImpl(final ClientSessionInternal session,
final long id,
final SimpleString queueName,
final SimpleString filterString,
@@ -168,9 +168,9 @@
this.ackBatchSize = ackBatchSize;
this.queueInfo = queueInfo;
-
+
this.contextClassLoader = contextClassLoader;
-
+
this.flowControlExecutor = flowControlExecutor;
}
@@ -281,7 +281,7 @@
failedOver = false;
}
}
-
+
if (callForceDelivery)
{
if (isTrace)
@@ -309,7 +309,7 @@
{
// forced delivery messages are discarded, nothing has been delivered by the queue
resetIfSlowConsumer();
-
+
if (isTrace)
{
log.trace("There was nothing on the queue, leaving it now:: returning null");
@@ -335,7 +335,7 @@
if (expired)
{
m.discardBody();
-
+
session.expire(id, m.getMessageID());
if (clientWindowSize == 0)
@@ -357,7 +357,7 @@
{
largeMessageReceived = m;
}
-
+
if (isTrace)
{
log.trace("Returning " + m);
@@ -385,12 +385,12 @@
public ClientMessage receive(final long timeout) throws HornetQException
{
ClientMessage msg = receive(timeout, false);
-
+
if (msg == null && !closed)
{
msg = receive(0, true);
}
-
+
return msg;
}
@@ -493,7 +493,7 @@
lastAckedMessage = null;
creditsToSend = 0;
-
+
failedOver = true;
ackIndividually = false;
@@ -518,7 +518,7 @@
{
return session;
}
-
+
public SessionQueueQueryResponseMessage getQueueInfo()
{
return queueInfo;
@@ -568,7 +568,7 @@
// consumed in, which means that acking all up to won't work
ackIndividually = true;
}
-
+
// Add it to the buffer
buffer.addTail(messageToHandle, messageToHandle.getPriority());
@@ -658,7 +658,7 @@
try
{
ClientMessageInternal message = iter.next();
-
+
if (message.isLargeMessage())
{
ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
@@ -674,7 +674,7 @@
}
clearBuffer();
-
+
try
{
if (currentLargeMessageController != null)
@@ -741,11 +741,11 @@
}
}
- /**
- *
+ /**
+ *
* LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
* So, this operation needs to be atomic.
- *
+ *
* @param discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
*/
public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
@@ -753,7 +753,7 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
-
+
if (creditsToSend >= clientWindowSize)
{
if (clientWindowSize == 0 && discountSlowConsumer)
@@ -806,7 +806,7 @@
// Private
// ---------------------------------------------------------------------------------------
- /**
+ /**
* Sending a initial credit for slow consumers
* */
private void startSlowConsumer()
@@ -833,7 +833,7 @@
latch.countDown();
}
});
-
+
try
{
latch.await(10, TimeUnit.SECONDS);
@@ -859,7 +859,7 @@
{
ClientConsumerImpl.log.trace("Adding Runner on Executor for delivery");
}
-
+
sessionExecutor.execute(runner);
}
@@ -916,7 +916,7 @@
{
return;
}
-
+
session.workDone();
// We pull the message from the buffer from inside the Runnable so we can ensure priority
@@ -935,7 +935,7 @@
{
rateLimiter.limit();
}
-
+
failedOver = false;
synchronized (this)
@@ -950,9 +950,9 @@
//Ignore, this could be a relic from a previous receiveImmediate();
return;
}
-
-
-
+
+
+
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -41,7 +41,7 @@
private final int windowSize;
- public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize)
+ ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize)
{
this.session = session;
@@ -58,24 +58,24 @@
{
boolean needInit = false;
ClientProducerCredits credits;
-
+
synchronized(this)
{
credits = producerCredits.get(address);
-
+
if (credits == null)
{
// Doesn't need to be fair since session is single threaded
credits = new ClientProducerCreditsImpl(session, address, windowSize);
needInit = true;
-
+
producerCredits.put(address, credits);
}
-
+
if (!anon)
{
credits.incrementRefCount();
-
+
// Remove from anon credits (if there)
unReferencedCredits.remove(address);
}
@@ -84,7 +84,7 @@
addToUnReferencedCache(address, credits);
}
}
-
+
// The init is done outside of the lock
// otherwise packages may arrive with flow control
// while this is still sending requests causing a dead lock
@@ -92,7 +92,7 @@
{
credits.init();
}
-
+
return credits;
}
}
@@ -133,15 +133,15 @@
}
producerCredits.clear();
-
+
unReferencedCredits.clear();
}
-
+
public synchronized int creditsMapSize()
{
return producerCredits.size();
}
-
+
public synchronized int unReferencedCreditsSize()
{
return unReferencedCredits.size();
@@ -161,7 +161,7 @@
iter.remove();
- removeEntry(oldest.getKey(), oldest.getValue());
+ removeEntry(oldest.getKey(), oldest.getValue());
}
}
@@ -173,8 +173,8 @@
credits.close();
}
-
-
+
+
static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits
{
static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
@@ -216,7 +216,7 @@
public void releaseOutstanding()
{
}
-
+
}
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -166,7 +166,7 @@
// Constructors
// ---------------------------------------------------------------------------------
- public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
+ ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connectorConfig,
final long callTimeout,
final long callFailoverTimeout,
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -104,7 +104,7 @@
* $Id: ClientSessionImpl.java 3603 2008-01-21 18:49:20Z timfox $
*
*/
-public class ClientSessionImpl implements ClientSessionInternal, FailureListener, CommandConfirmationHandler
+class ClientSessionImpl implements ClientSessionInternal, FailureListener, CommandConfirmationHandler
{
// Constants ----------------------------------------------------------------------------
@@ -687,7 +687,7 @@
stop(true);
}
- public void stop(final boolean waitForOnMessage) throws HornetQException
+ private void stop(final boolean waitForOnMessage) throws HornetQException
{
checkClosed();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -40,7 +40,7 @@
*
*
*/
-public class CompressedLargeMessageControllerImpl implements LargeMessageController
+class CompressedLargeMessageControllerImpl implements LargeMessageController
{
// Constants -----------------------------------------------------
@@ -49,7 +49,7 @@
// Attributes ----------------------------------------------------
- final LargeMessageController bufferDelegate;
+ private final LargeMessageController bufferDelegate;
// Static --------------------------------------------------------
@@ -115,7 +115,7 @@
{
return -1;
}
-
+
DataInputStream dataInput = null;
private DataInputStream getStream()
@@ -1002,14 +1002,6 @@
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.buffers.ChannelBuffer#compareTo(org.hornetq.api.core.buffers.ChannelBuffer)
- */
- public int compareTo(final HornetQBuffer buffer)
- {
- return -1;
- }
-
public HornetQBuffer copy()
{
throw new UnsupportedOperationException();
@@ -1020,15 +1012,6 @@
throw new UnsupportedOperationException();
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- /**
- * @param body
- */
// Inner classes -------------------------------------------------
public ChannelBuffer channelBuffer()
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -62,7 +62,7 @@
private volatile SessionReceiveContinuationMessage currentPacket = null;
private final long totalSize;
-
+
private final int bufferSize;
private boolean streamEnded = false;
@@ -132,7 +132,7 @@
}
/**
- *
+ *
*/
public void discardUnusedPackets()
{
@@ -207,7 +207,7 @@
}
}
-
+
packets.offer(packet);
}
}
@@ -228,7 +228,7 @@
public synchronized void cancel()
{
-
+
int totalSize = 0;
Packet polledPacket = null;
while ((polledPacket = packets.poll()) != null)
@@ -245,7 +245,7 @@
// what else can we do here?
log.warn(ignored.getMessage(), ignored);
}
-
+
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
streamClosed = true;
@@ -308,7 +308,7 @@
}
/**
- *
+ *
* @param timeWait Milliseconds to Wait. 0 means forever
* @throws Exception
*/
@@ -453,9 +453,6 @@
dst.put(bytesToGet);
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.io.OutputStream, int)
- */
public void getBytes(final int index, final OutputStream out, final int length) throws IOException
{
byte bytesToGet[] = new byte[length];
@@ -463,26 +460,21 @@
out.write(bytesToGet);
}
- public void getBytes(final long index, final OutputStream out, final int length) throws IOException
+ private void getBytes(final long index, final OutputStream out, final int length) throws IOException
{
byte bytesToGet[] = new byte[length];
getBytes(index, bytesToGet);
out.write(bytesToGet);
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.channels.GatheringByteChannel, int)
- */
- public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+ private int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
{
byte bytesToGet[] = new byte[length];
getBytes(index, bytesToGet);
return out.write(ByteBuffer.wrap(bytesToGet));
}
- /* (non-Javadoc)
- * @see org.hornetq.api.core.buffers.ChannelBuffer#getInt(int)
- */
+ @Override
public int getInt(final int index)
{
return (getByte(index) & 0xff) << 24 | (getByte(index + 1) & 0xff) << 16 |
@@ -1060,7 +1052,7 @@
{
return (char)readShort();
}
-
+
public char getChar(final int index)
{
return (char)getShort(index);
@@ -1344,7 +1336,7 @@
{
throw new IllegalAccessError("Can't read the messageBody after setting outputStream");
}
-
+
if (index >= totalSize)
{
throw new IndexOutOfBoundsException();
@@ -1410,7 +1402,7 @@
}
readCachePositionStart = position / bufferSize * bufferSize;
-
+
cachedChannel.position(readCachePositionStart);
if (readCache == null)
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -294,7 +294,7 @@
return listenersCopy;
}
- public boolean removeMember(final long uniqueEventID, final String nodeId)
+ boolean removeMember(final long uniqueEventID, final String nodeId)
{
TopologyMember member;
@@ -460,7 +460,7 @@
return members;
}
- public synchronized int nodes()
+ synchronized int nodes()
{
int count = 0;
for (TopologyMember member : topology.values())
@@ -482,7 +482,7 @@
return describe("");
}
- public synchronized String describe(final String text)
+ private synchronized String describe(final String text)
{
StringBuilder desc = new StringBuilder(text + "topology on " + this + ":\n");
for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topology).entrySet())
@@ -497,7 +497,7 @@
return desc.toString();
}
- public int members()
+ private int members()
{
return topology.size();
}
@@ -510,7 +510,7 @@
this.owner = owner;
}
- public TransportConfiguration getBackupForConnector(final TransportConfiguration connectorConfiguration)
+ TransportConfiguration getBackupForConnector(final TransportConfiguration connectorConfiguration)
{
for (TopologyMember member : topology.values())
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/TopologyMember.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/TopologyMember.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -30,17 +30,12 @@
/** transient to avoid serialization changes */
private transient long uniqueEventID = System.currentTimeMillis();
- public TopologyMember(final Pair<TransportConfiguration, TransportConfiguration> connector)
+ public TopologyMember(final TransportConfiguration a, final TransportConfiguration b)
{
- this.connector = connector;
+ this.connector = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
uniqueEventID = System.currentTimeMillis();
}
- public TopologyMember(final TransportConfiguration a, final TransportConfiguration b)
- {
- this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
- }
-
public TransportConfiguration getA()
{
return connector.getA();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -49,21 +49,21 @@
private static final long serialVersionUID = 4077088945050267843L;
- public static final boolean DEFAULT_CLUSTERED = false;
+ static final boolean DEFAULT_CLUSTERED = false;
- public static final boolean DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY = false;
+ static final boolean DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY = false;
- public static final boolean DEFAULT_BACKUP = false;
+ static final boolean DEFAULT_BACKUP = false;
- public static final boolean DEFAULT_ALLOW_AUTO_FAILBACK = true;
+ private static final boolean DEFAULT_ALLOW_AUTO_FAILBACK = true;
- public static final boolean DEFAULT_SHARED_STORE = true;
+ static final boolean DEFAULT_SHARED_STORE = true;
- public static final boolean DEFAULT_FILE_DEPLOYMENT_ENABLED = false;
+ static final boolean DEFAULT_FILE_DEPLOYMENT_ENABLED = false;
- public static final boolean DEFAULT_PERSISTENCE_ENABLED = true;
+ static final boolean DEFAULT_PERSISTENCE_ENABLED = true;
- public static final long DEFAULT_FILE_DEPLOYER_SCAN_PERIOD = 5000;
+ static final long DEFAULT_FILE_DEPLOYER_SCAN_PERIOD = 5000;
public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
@@ -79,7 +79,7 @@
public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
- public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;
+ static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;
public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
@@ -90,7 +90,7 @@
public static final String DEFAULT_PAGING_DIR = "data/paging";
public static final String DEFAULT_LARGE_MESSAGES_DIR = "data/largemessages";
-
+
public static final int DEFAULT_MAX_CONCURRENT_PAGE_IO = 5;
public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
@@ -161,7 +161,7 @@
public static final int DEFAULT_ID_CACHE_SIZE = 2000;
public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
-
+
public static final boolean DEFAULT_CLUSTER_DUPLICATE_DETECTION = true;
public static final boolean DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS = false;
@@ -171,13 +171,13 @@
public static final long DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
public static final int DEFAULT_CLUSTER_RECONNECT_ATTEMPTS = -1;
-
+
public static final long DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
+
public static final long DEFAULT_CLUSTER_CONNECTION_TTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
+
public static final double DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
+
public static final long DEFAULT_CLUSTER_MAX_RETRY_INTERVAL = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
@@ -199,32 +199,33 @@
public static final String DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME = JULLogDelegateFactory.class.getCanonicalName();
// Attributes -----------------------------------------------------------------------------
-
- protected String name = "ConfigurationImpl::" + System.identityHashCode(this);
+ private String name = "ConfigurationImpl::" + System.identityHashCode(this);
+
protected boolean clustered = ConfigurationImpl.DEFAULT_CLUSTERED;
protected boolean backup = ConfigurationImpl.DEFAULT_BACKUP;
- protected boolean allowAutoFailBack = ConfigurationImpl.DEFAULT_ALLOW_AUTO_FAILBACK;
+ private boolean allowAutoFailBack = ConfigurationImpl.DEFAULT_ALLOW_AUTO_FAILBACK;
- protected boolean sharedStore = ConfigurationImpl.DEFAULT_SHARED_STORE;
+ private boolean sharedStore = ConfigurationImpl.DEFAULT_SHARED_STORE;
protected boolean fileDeploymentEnabled = ConfigurationImpl.DEFAULT_FILE_DEPLOYMENT_ENABLED;
- protected boolean persistenceEnabled = ConfigurationImpl.DEFAULT_PERSISTENCE_ENABLED;
+ private boolean persistenceEnabled = ConfigurationImpl.DEFAULT_PERSISTENCE_ENABLED;
protected long fileDeploymentScanPeriod = ConfigurationImpl.DEFAULT_FILE_DEPLOYER_SCAN_PERIOD;
- protected boolean persistDeliveryCountBeforeDelivery = ConfigurationImpl.DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY;
+ private boolean persistDeliveryCountBeforeDelivery =
+ ConfigurationImpl.DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY;
- protected int scheduledThreadPoolMaxSize = ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+ private int scheduledThreadPoolMaxSize = ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
- protected int threadPoolMaxSize = ConfigurationImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+ private int threadPoolMaxSize = ConfigurationImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
- protected long securityInvalidationInterval = ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL;
+ private long securityInvalidationInterval = ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL;
- protected boolean securityEnabled = ConfigurationImpl.DEFAULT_SECURITY_ENABLED;
+ private boolean securityEnabled = ConfigurationImpl.DEFAULT_SECURITY_ENABLED;
protected boolean jmxManagementEnabled = ConfigurationImpl.DEFAULT_JMX_MANAGEMENT_ENABLED;
@@ -234,21 +235,21 @@
protected boolean asyncConnectionExecutionEnabled = ConfigurationImpl.DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED;
- protected long messageExpiryScanPeriod = ConfigurationImpl.DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD;
+ private long messageExpiryScanPeriod = ConfigurationImpl.DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD;
- protected int messageExpiryThreadPriority = ConfigurationImpl.DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
+ private int messageExpiryThreadPriority = ConfigurationImpl.DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
protected int idCacheSize = ConfigurationImpl.DEFAULT_ID_CACHE_SIZE;
- protected boolean persistIDCache = ConfigurationImpl.DEFAULT_PERSIST_ID_CACHE;
+ private boolean persistIDCache = ConfigurationImpl.DEFAULT_PERSIST_ID_CACHE;
protected String logDelegateFactoryClassName = ConfigurationImpl.DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME;
- protected List<String> interceptorClassNames = new ArrayList<String>();
-
+ private List<String> interceptorClassNames = new ArrayList<String>();
+
protected Map<String, TransportConfiguration> connectorConfigs = new HashMap<String, TransportConfiguration>();
- protected Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
+ private Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
protected String liveConnectorName;
@@ -258,7 +259,7 @@
protected List<ClusterConnectionConfiguration> clusterConfigurations = new ArrayList<ClusterConnectionConfiguration>();
- protected List<CoreQueueConfiguration> queueConfigurations = new ArrayList<CoreQueueConfiguration>();
+ private List<CoreQueueConfiguration> queueConfigurations = new ArrayList<CoreQueueConfiguration>();
protected List<BroadcastGroupConfiguration> broadcastGroupConfigurations = new ArrayList<BroadcastGroupConfiguration>();
@@ -266,12 +267,12 @@
// Paging related attributes ------------------------------------------------------------
- protected String pagingDirectory = ConfigurationImpl.DEFAULT_PAGING_DIR;
+ private String pagingDirectory = ConfigurationImpl.DEFAULT_PAGING_DIR;
// File related attributes -----------------------------------------------------------
-
- protected int maxConcurrentPageIO = ConfigurationImpl.DEFAULT_MAX_CONCURRENT_PAGE_IO;
+ private int maxConcurrentPageIO = ConfigurationImpl.DEFAULT_MAX_CONCURRENT_PAGE_IO;
+
protected String largeMessagesDirectory = ConfigurationImpl.DEFAULT_LARGE_MESSAGES_DIR;
protected String bindingsDirectory = ConfigurationImpl.DEFAULT_BINDINGS_DIRECTORY;
@@ -316,34 +317,34 @@
protected boolean runSyncSpeedTest = ConfigurationImpl.DEFAULT_RUN_SYNC_SPEED_TEST;
- protected boolean wildcardRoutingEnabled = ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED;
+ private boolean wildcardRoutingEnabled = ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED;
- protected boolean messageCounterEnabled = ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_ENABLED;
+ private boolean messageCounterEnabled = ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_ENABLED;
- protected long messageCounterSamplePeriod = ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_SAMPLE_PERIOD;
+ private long messageCounterSamplePeriod = ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_SAMPLE_PERIOD;
- protected int messageCounterMaxDayHistory = ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_MAX_DAY_HISTORY;
+ private int messageCounterMaxDayHistory = ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_MAX_DAY_HISTORY;
- protected long transactionTimeout = ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT;
+ private long transactionTimeout = ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT;
- protected long transactionTimeoutScanPeriod = ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
+ private long transactionTimeoutScanPeriod = ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
- protected SimpleString managementAddress = ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
+ private SimpleString managementAddress = ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS;
- protected SimpleString managementNotificationAddress = ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+ private SimpleString managementNotificationAddress = ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
protected String clusterUser = ConfigurationImpl.DEFAULT_CLUSTER_USER;
protected String clusterPassword = ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD;
- protected long serverDumpInterval = ConfigurationImpl.DEFAULT_SERVER_DUMP_INTERVAL;
+ private long serverDumpInterval = ConfigurationImpl.DEFAULT_SERVER_DUMP_INTERVAL;
protected boolean failoverOnServerShutdown = ConfigurationImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
// percentage of free memory which triggers warning from the memory manager
- protected int memoryWarningThreshold = ConfigurationImpl.DEFAULT_MEMORY_WARNING_THRESHOLD;
+ private int memoryWarningThreshold = ConfigurationImpl.DEFAULT_MEMORY_WARNING_THRESHOLD;
- protected long memoryMeasureInterval = ConfigurationImpl.DEFAULT_MEMORY_MEASURE_INTERVAL;
+ private long memoryMeasureInterval = ConfigurationImpl.DEFAULT_MEMORY_MEASURE_INTERVAL;
protected GroupingHandlerConfiguration groupingHandlerConfiguration;
@@ -529,7 +530,7 @@
{
this.liveConnectorName = liveConnectorName;
}
-
+
public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
{
return groupingHandlerConfiguration;
@@ -629,8 +630,8 @@
{
bindingsDirectory = dir;
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.config.Configuration#getPageMaxConcurrentIO()
*/
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -80,7 +80,7 @@
private static final String ROLES_ATTR_NAME = "roles";
- private static final String CREATEDURABLEQUEUE_NAME = "createDurableQueue";
+ static final String CREATEDURABLEQUEUE_NAME = "createDurableQueue";
private static final String DELETEDURABLEQUEUE_NAME = "deleteDurableQueue";
@@ -431,8 +431,8 @@
"journal-directory",
config.getJournalDirectory(),
Validators.NOT_NULL_OR_EMPTY));
-
-
+
+
config.setPageMaxConcurrentIO(XMLConfigurationUtil.getInteger(e,
"page-max-concurrent-io",
5,
@@ -1016,7 +1016,7 @@
"max-hops",
ConfigurationImpl.DEFAULT_CLUSTER_MAX_HOPS,
Validators.GE_ZERO);
-
+
long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(e, "check-period",
ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD, Validators.GT_ZERO) ;
@@ -1028,18 +1028,18 @@
"retry-interval",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
Validators.GT_ZERO);
-
+
long callTimeout = XMLConfigurationUtil.getLong(e, "call-timeout", HornetQClient.DEFAULT_CALL_TIMEOUT, Validators.GT_ZERO);
long callFailoverTimeout = XMLConfigurationUtil.getLong(e, "call-failover-timeout", HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, Validators.MINUS_ONE_OR_GT_ZERO);
-
- double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier",
+
+ double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
-
+
int minLargeMessageSize = XMLConfigurationUtil.getInteger(e, "min-large-message-size", HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, Validators.GT_ZERO);
-
+
long maxRetryInterval = XMLConfigurationUtil.getLong(e, "max-retry-interval", ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
-
+
int reconnectAttempts = XMLConfigurationUtil.getInteger(e, "reconnect-attempts", ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS, Validators.MINUS_ONE_OR_GE_ZERO);
@@ -1167,15 +1167,15 @@
long connectionTTL = XMLConfigurationUtil.getLong(brNode, "connection-ttl",
HornetQClient.DEFAULT_CONNECTION_TTL, Validators.GT_ZERO) ;
-
+
int minLargeMessageSize = XMLConfigurationUtil.getInteger(brNode,
"min-large-message-size",
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
Validators.GT_ZERO);
-
+
long maxRetryInterval = XMLConfigurationUtil.getLong(brNode, "max-retry-interval", HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
-
+
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(brNode,
"retry-interval-multiplier",
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/Operator.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/Operator.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/filter/impl/Operator.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -46,7 +46,7 @@
Object oper3;
- Object arg1;
+ private Object arg1;
Object arg2;
@@ -81,7 +81,7 @@
public final static int DIFFERENT = 8;
- public final static int ADD = 9;
+ final static int ADD = 9;
public final static int SUB = 10;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/message/impl/MessageImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/message/impl/MessageImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -24,7 +24,6 @@
import org.hornetq.api.core.PropertyConversionException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.utils.DataConstants;
@@ -47,17 +46,13 @@
*/
public abstract class MessageImpl implements MessageInternal
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(MessageImpl.class);
-
public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
-
- // used by the bridges to set duplicates
+
+ // used by the bridges to set duplicates
public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_HQ_BRIDGE_DUP");
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
-
+
public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
@@ -70,7 +65,7 @@
protected boolean durable;
/** GMT milliseconds at which this message expires. 0 means never expires * */
- protected long expiration;
+ private long expiration;
protected long timestamp;
@@ -91,7 +86,7 @@
private boolean copied = true;
private boolean bufferUsed;
-
+
private UUID userID;
// Constructors --------------------------------------------------
@@ -201,7 +196,7 @@
{
return DataConstants.SIZE_LONG + // Message ID
DataConstants.SIZE_BYTE + // user id null?
- (userID == null ? 0 : 16) +
+ (userID == null ? 0 : 16) +
/* address */SimpleString.sizeofNullableString(address) +
DataConstants./* Type */SIZE_BYTE +
DataConstants./* Durable */SIZE_BOOLEAN +
@@ -210,8 +205,8 @@
DataConstants./* Priority */SIZE_BYTE +
/* PropertySize and Properties */properties.getEncodeSize();
}
-
+
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
{
buffer.writeLong(messageID);
@@ -254,7 +249,7 @@
priority = buffer.readByte();
properties.decode(buffer);
}
-
+
public void copyHeadersAndProperties(final MessageInternal msg)
{
messageID = msg.getMessageID();
@@ -267,7 +262,7 @@
priority = msg.getPriority();
properties = msg.getTypedProperties();
}
-
+
public HornetQBuffer getBodyBuffer()
{
if (bodyBuffer == null)
@@ -282,18 +277,18 @@
{
return messageID;
}
-
+
public UUID getUserID()
{
return userID;
}
-
+
public void setUserID(final UUID userID)
{
this.userID = userID;
}
- /** this doesn't need to be synchronized as setAddress is protecting the buffer,
+ /** this doesn't need to be synchronized as setAddress is protecting the buffer,
* not the address*/
public SimpleString getAddress()
{
@@ -313,7 +308,7 @@
if (this.address != address)
{
this.address = address;
-
+
bufferValid = false;
}
}
@@ -513,7 +508,7 @@
return buffer;
}
}
-
+
public void setAddressTransient(final SimpleString address)
{
this.address = address;
@@ -585,7 +580,7 @@
bufferValid = false;
}
-
+
public void putObjectProperty(final SimpleString key, final Object value) throws PropertyConversionException
{
if (value == null)
@@ -826,7 +821,7 @@
{
return properties.getSimpleStringProperty(new SimpleString(key));
}
-
+
public Object getObjectProperty(final String key)
{
return properties.getProperty(new SimpleString(key));
@@ -870,7 +865,7 @@
{
return new DecodingContext();
}
-
+
public TypedProperties getTypedProperties()
{
return this.properties;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -540,7 +540,7 @@
/**
* Finalize day counter hour array elements
*/
- void finalizeDayCounter()
+ private void finalizeDayCounter()
{
// a new day has began, so fill all array elements from index to end with
// '0' values
@@ -571,7 +571,7 @@
* "Date, hour counter 0, hour counter 1, ..., hour counter 23".
* @return String day counter data
*/
- String getDayCounterAsString()
+ private String getDayCounterAsString()
{
// first element day counter date
DateFormat dateFormat = DateFormat.getDateInstance(DateFormat.SHORT);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -57,10 +57,10 @@
/**
* A PageCursorImpl
*
- * A page cursor will always store its
+ * A page cursor will always store its
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*/
-public class PageSubscriptionImpl implements PageSubscription
+class PageSubscriptionImpl implements PageSubscription
{
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(PageSubscriptionImpl.class);
@@ -97,21 +97,19 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
-
+
private final Executor executor;
private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
-
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageSubscriptionImpl(final PageCursorProvider cursorProvider,
+ PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
final Executor executor,
@@ -210,7 +208,7 @@
}
}
- /**
+ /**
* It will cleanup all the records for completed pages
* */
public void cleanupEntries(final boolean completeDelete) throws Exception
@@ -228,7 +226,7 @@
// First get the completed pages using a lock
synchronized (this)
{
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
PageCursorInfo info = entry.getValue();
if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
@@ -294,7 +292,7 @@
}
}
- if (!completeDelete)
+ if (!completeDelete)
{
cursorProvider.scheduleCleanup();
}
@@ -380,7 +378,7 @@
}
/**
- *
+ *
*/
private synchronized PagePosition getStartPosition()
{
@@ -412,7 +410,7 @@
}
}
- if (isTrace)
+ if (isTrace)
{
trace("Returning initial position " + retValue);
}
@@ -535,7 +533,7 @@
}
}
- /**
+ /**
* Theres no need to synchronize this method as it's only called from journal load on startup
*/
public void reloadACK(final PagePosition position)
@@ -565,7 +563,7 @@
processACK(position);
}
-
+
public void lateDeliveryRollback(PagePosition position)
{
PageCursorInfo cursorInfo = processACK(position);
@@ -589,9 +587,9 @@
final long tx = store.generateUniqueID();
try
{
-
+
boolean isPersistent = false;
-
+
synchronized (PageSubscriptionImpl.this)
{
for (PageCursorInfo cursor : consumedPages.values())
@@ -606,12 +604,12 @@
}
}
}
-
+
if (isPersistent)
{
store.commit(tx);
}
-
+
cursorProvider.close(this);
}
catch (Exception e)
@@ -761,7 +759,7 @@
// Protected -----------------------------------------------------
- protected boolean match(final ServerMessage message)
+ private boolean match(final ServerMessage message)
{
if (filter == null)
{
@@ -791,7 +789,7 @@
{
log.trace("Scheduling cleanup on pageSubscription for address = " + pageStore.getAddress() + " queue = " + this.getQueue().getName());
}
-
+
// there's a different page being acked, we will do the check right away
if (autoCleanup)
{
@@ -803,7 +801,7 @@
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
-
+
return info;
}
@@ -860,7 +858,7 @@
// Inner classes -------------------------------------------------
- /**
+ /**
* This will hold information about the pending ACKs towards a page.
* This instance will be released as soon as the entire page is consumed, releasing the memory at that point
* The ref counts are increased also when a message is ignored for any reason.
@@ -985,7 +983,7 @@
}
/**
- *
+ *
*/
protected void checkDone()
{
@@ -1019,11 +1017,12 @@
}
- static class PageCursorTX extends TransactionOperationAbstract
+ private static class PageCursorTX extends TransactionOperationAbstract
{
- HashMap<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<PageSubscriptionImpl, List<PagePosition>>();
+ private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions =
+ new HashMap<PageSubscriptionImpl, List<PagePosition>>();
- public void addPositionConfirmation(final PageSubscriptionImpl cursor, final PagePosition position)
+ private void addPositionConfirmation(final PageSubscriptionImpl cursor, final PagePosition position)
{
List<PagePosition> list = pendingPositions.get(cursor);
@@ -1036,9 +1035,6 @@
list.add(position);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
- */
@Override
public void afterCommit(final Transaction tx)
{
@@ -1057,9 +1053,6 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
- */
@Override
public List<MessageReference> getRelatedMessageReferences()
{
@@ -1068,7 +1061,7 @@
}
- class CursorIterator implements LinkedListIterator<PagedReference>
+ private class CursorIterator implements LinkedListIterator<PagedReference>
{
private PagePosition position = null;
@@ -1188,9 +1181,9 @@
{
ignored = true;
}
-
+
PageCursorInfo info = getPageInfo(message.getPosition(), false);
-
+
if (info != null && info.isRemoved(message.getPosition()))
{
continue;
@@ -1226,7 +1219,7 @@
// Say you have a Browser that will only read the files... there's no need to control PageCursors is
// nothing
// is being changed. That's why the false is passed as a parameter here
-
+
if (info != null && info.isRemoved(message.getPosition()))
{
valid = false;
@@ -1263,7 +1256,7 @@
}
}
- /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
+ /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
* It would be a rare race condition but I would prefer avoiding that scenario */
public synchronized boolean hasNext()
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -32,7 +32,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
/**
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
@@ -63,7 +63,7 @@
private final StorageManager storageManager;
- private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions =
+ private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions =
new ConcurrentHashMap<Long, PageTransactionInfo>();
// Static
@@ -88,9 +88,9 @@
// Public
// ---------------------------------------------------------------------------------------------------------------------------
-
+
// Hierarchical changes listener
-
+
/* (non-Javadoc)
* @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
*/
@@ -100,11 +100,11 @@
}
-
+
// PagingManager implementation
// -----------------------------------------------------------------------------------------------------
- public void reaplySettings()
+ private void reaplySettings()
{
for (PagingStore store : stores.values())
{
@@ -112,7 +112,7 @@
store.applySetting(settings);
}
}
-
+
public SimpleString[] getStoreNames()
{
Set<SimpleString> names = stores.keySet();
@@ -202,7 +202,7 @@
}
return transactions.get(id);
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.PagingManager#getTransactions()
*/
@@ -302,12 +302,12 @@
syncLock.readLock().unlock();
}
}
-
+
protected PagingStoreFactory getStoreFactory()
{
return pagingStoreFactory;
}
-
+
public void unlock()
{
syncLock.writeLock().unlock();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -97,7 +97,7 @@
private long pageSize;
- private AddressFullMessagePolicy addressFullMessagePolicy;
+ private volatile AddressFullMessagePolicy addressFullMessagePolicy;
private boolean printedDropMessagesWarning;
@@ -217,6 +217,7 @@
// Public --------------------------------------------------------
+ @Override
public String toString()
{
return "PagingStoreImpl(" + this.address + ")";
@@ -389,18 +390,17 @@
return pagingManager;
}
- // HornetQComponent implementation
-
+ @Override
public boolean isStarted()
{
return running;
}
+ @Override
public synchronized void stop() throws Exception
{
if (running)
{
-
cursorProvider.stop();
running = false;
@@ -429,6 +429,7 @@
}
}
+ @Override
public void start() throws Exception
{
lock.writeLock().lock();
@@ -727,13 +728,13 @@
private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
- static final class OurRunnable implements Runnable
+ private static final class OurRunnable implements Runnable
{
- boolean ran;
+ private boolean ran;
- final Runnable runnable;
+ private final Runnable runnable;
- OurRunnable(final Runnable runnable)
+ private OurRunnable(final Runnable runnable)
{
this.runnable = runnable;
}
@@ -826,7 +827,10 @@
}
- protected boolean page(ServerMessage message, final RoutingContext ctx, RouteContextList listCtx, final boolean sync) throws Exception
+ private
+ boolean
+ page(ServerMessage message, final RoutingContext ctx, RouteContextList listCtx, final boolean sync)
+ throws Exception
{
if (!running)
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -333,7 +333,7 @@
// Protected -----------------------------------------------------
- protected int stringEncodeSize(final String str)
+ private int stringEncodeSize(final String str)
{
return DataConstants.SIZE_INT + str.length() * 2;
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -49,7 +49,7 @@
// ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(RemotingConnectionImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
// Static
@@ -75,7 +75,7 @@
private volatile boolean destroyed;
private final boolean client;
-
+
private int clientVersion;
// Channels 0-9 are reserved for the system
@@ -93,13 +93,13 @@
private volatile boolean dataReceived;
private final Executor executor;
-
+
private volatile boolean executing;
-
+
private final SimpleString nodeID;
private final long creationTime;
-
+
private String clientID;
// Constructors
@@ -119,7 +119,7 @@
/*
* Create a server side connection
*/
- public RemotingConnectionImpl(final Connection transportConnection,
+ RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
final Executor executor,
final SimpleString nodeID)
@@ -148,21 +148,18 @@
this.client = client;
this.executor = executor;
-
+
this.nodeID = nodeID;
-
+
this.creationTime = System.currentTimeMillis();
}
-
-
-
+
+
+
// RemotingConnection implementation
// ------------------------------------------------------------
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
@@ -190,7 +187,7 @@
failureListeners.addAll(listeners);
}
-
+
/**
* @return the clientVersion
*/
@@ -216,7 +213,7 @@
{
return transportConnection.getRemoteAddress();
}
-
+
public long getCreationTime()
{
return creationTime;
@@ -289,26 +286,26 @@
public List<CloseListener> removeCloseListeners()
{
List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
-
+
closeListeners.clear();
-
+
return ret;
}
public List<FailureListener> removeFailureListeners()
{
List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
-
+
failureListeners.clear();
-
- return ret;
+
+ return ret;
}
public void setCloseListeners(List<CloseListener> listeners)
{
closeListeners.clear();
-
- closeListeners.addAll(listeners);
+
+ closeListeners.addAll(listeners);
}
public HornetQBuffer createBuffer(final int size)
@@ -365,7 +362,7 @@
callClosingListeners();
}
-
+
public void disconnect(final boolean criticalError)
{
Channel channel0 = getChannel(0, -1);
@@ -373,7 +370,7 @@
// And we remove all channels from the connection, this ensures no more packets will be processed after this
// method is
// complete
-
+
Set<Channel> allChannels = new HashSet<Channel>(channels.values());
if (!criticalError)
@@ -388,7 +385,7 @@
}
// Now we are 100% sure that no more packets will be processed we can flush then send the disconnect
-
+
if (!criticalError)
{
for (Channel channel: allChannels)
@@ -468,7 +465,7 @@
}
}
}
-
+
public void checkFlushBatchBuffer()
{
transportConnection.checkFlushBatchBuffer();
@@ -487,16 +484,16 @@
try
{
final Packet packet = PacketDecoder.decode(buffer);
-
+
if (isTrace)
{
log.trace("handling packet " + packet);
}
-
+
if (packet.isAsyncExec() && executor != null)
{
executing = true;
-
+
executor.execute(new Runnable()
{
public void run()
@@ -509,7 +506,7 @@
{
RemotingConnectionImpl.log.error("Unexpected error", t);
}
-
+
executing = false;
}
});
@@ -521,13 +518,13 @@
{
Thread.yield();
}
-
+
// Pings must always be handled out of band so we can send pings back to the client quickly
// otherwise they would get in the queue with everything else which might give an intolerable delay
doBufferReceived(packet);
}
-
- dataReceived = true;
+
+ dataReceived = true;
}
catch (Exception e)
{
@@ -585,8 +582,8 @@
{
channels.clear();
}
- }
-
+ }
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
@@ -642,7 +639,7 @@
{
clientID = cID;
}
-
+
public String getClientID()
{
return clientID;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/LiveIsStoppingMessage.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -17,5 +17,4 @@
{
super(PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
}
-
}
Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalTransaction.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -425,9 +425,9 @@
}
}
- static class JournalUpdate
+ private static class JournalUpdate
{
- JournalFile file;
+ private final JournalFile file;
long id;
@@ -438,7 +438,7 @@
* @param id
* @param size
*/
- public JournalUpdate(final JournalFile file, final long id, final int size)
+ private JournalUpdate(final JournalFile file, final long id, final int size)
{
super();
this.file = file;
Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -37,7 +37,7 @@
{
private final boolean isCommit;
- public final long txID;
+ private final long txID;
private final EncodingSupport transactionData;
Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2012-03-08 16:07:11 UTC (rev 12269)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2012-03-08 16:08:38 UTC (rev 12270)
@@ -25,7 +25,7 @@
*/
public class JournalRollbackRecordTX extends JournalInternalRecord
{
- public final long txID;
+ private final long txID;
public JournalRollbackRecordTX(final long txID)
{
12 years, 2 months
JBoss hornetq SVN: r12269 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-08 11:07:11 -0500 (Thu, 08 Mar 2012)
New Revision: 12269
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
Log:
HORNETQ-776 HORNETQ-720 Improve QuorumVoting code and tests. (Unfinished)
Note that currently the code will avoid QuorumVoting even for this test!
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -2108,7 +2108,7 @@
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
- final QuorumManager quorumManager = new QuorumManager(serverLocator0);
+ final QuorumManager quorumManager = new QuorumManager(serverLocator0, threadPool);
replicationEndpoint.setQuorumManager(quorumManager);
serverLocator0.setReconnectAttempts(-1);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QuorumManager.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -7,7 +7,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,12 +34,15 @@
private final Map<String, Pair<TransportConfiguration, TransportConfiguration>> nodes =
new ConcurrentHashMap<String, Pair<TransportConfiguration, TransportConfiguration>>();
+ private final ExecutorService executor;
+
/** safety parameter to make _sure_ we get out of await() */
private static final int LATCH_TIMEOUT = 60;
private static final long DISCOVERY_TIMEOUT = 5;
- public QuorumManager(ServerLocator serverLocator)
+ public QuorumManager(ServerLocator serverLocator, ExecutorService executor)
{
+ this.executor = executor;
this.locator = serverLocator;
locator.addClusterTopologyListener(this);
}
@@ -81,11 +83,10 @@
{
return true;
}
- // go for the vote...
+
final int size = nodes.size();
Set<ServerLocator> locatorsList = new HashSet<ServerLocator>(size);
AtomicInteger pingCount = new AtomicInteger(0);
- ExecutorService pool = Executors.newFixedThreadPool(size);
final CountDownLatch latch = new CountDownLatch(size);
try
{
@@ -96,7 +97,7 @@
TransportConfiguration serverTC = pair.getValue().getA();
ServerLocatorImpl locator = (ServerLocatorImpl)HornetQClient.createServerLocatorWithoutHA(serverTC);
locatorsList.add(locator);
- pool.submit(new ServerConnect(latch, pingCount, locator));
+ executor.submit(new ServerConnect(latch, pingCount, locator));
}
// Some servers may have disappeared between the latch creation
for (int i = 0; i < size - locatorsList.size(); i++)
@@ -125,7 +126,6 @@
// no-op
}
}
- pool.shutdownNow();
}
}
@@ -145,7 +145,7 @@
@Override
public void run()
{
- locator.setReconnectAttempts(-1);
+ locator.setReconnectAttempts(0);
locator.getDiscoveryGroupConfiguration().setDiscoveryInitialWaitTimeout(DISCOVERY_TIMEOUT);
final ClientSessionFactory liveServerSessionFactory;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -17,27 +17,37 @@
setupCluster();
startServers(0, 1, 2, 3, 4, 5);
+
+ for (int i = 0; i < 3; i++)
+ {
+ waitForTopology(servers[i], 3, 3);
+ }
+
+ waitForFailoverTopology(3, 0, 1, 2);
+ waitForFailoverTopology(4, 0, 1, 2);
+ waitForFailoverTopology(5, 0, 1, 2);
+
for (int i : new int[] { 0, 1, 2 })
{
setupSessionFactory(i, i + 3, isNetty(), false);
}
createQueue(0, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+ addConsumer(0, 0, QUEUE_NAME, null);
+ waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
locators[0].addClusterTopologyListener(liveTopologyListener);
- final TopologyListener backupTopologyListener = new TopologyListener("LIVE-2");
- locators[1].addClusterTopologyListener(backupTopologyListener);
-
assertTrue("we assume 3 is a backup", servers[3].getConfiguration().isBackup());
assertFalse("no shared storage", servers[3].getConfiguration().isSharedStore());
- // assertEquals(liveTopologyListener.toString(), 6, liveTopologyListener.nodes.size());
- // assertEquals(backupTopologyListener.toString(), 6, backupTopologyListener.nodes.size());
+ failNode(0);
- failNode(0);
+ waitForFailoverTopology(4, 3, 1, 2);
+ waitForFailoverTopology(5, 3, 1, 2);
+
waitForBindings(3, QUEUES_TESTADDRESS, 1, 1, true);
assertTrue(servers[3].waitForInitialization(10, TimeUnit.SECONDS));
@@ -67,14 +77,12 @@
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
nodes.put(nodeID, connectorPair);
- System.out.println(prefix + " UP: " + nodeID + " connectPair=" + connectorPair);
}
@Override
public void nodeDown(long eventUID, String nodeID)
{
nodes.remove(nodeID);
- System.out.println(prefix + " DOWN: " + nodeID);
}
@Override
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java 2012-03-08 16:06:44 UTC (rev 12268)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java 2012-03-08 16:07:11 UTC (rev 12269)
@@ -23,15 +23,6 @@
public class StaticClusterWithBackupFailoverTest extends ClusterWithBackupFailoverTestBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
@Override
protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
{
12 years, 2 months
JBoss hornetq SVN: r12268 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-08 11:06:44 -0500 (Thu, 08 Mar 2012)
New Revision: 12268
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
Fix tests leaving serverLocator open.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2012-03-08 16:06:28 UTC (rev 12267)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2012-03-08 16:06:44 UTC (rev 12268)
@@ -204,7 +204,7 @@
Assert.assertTrue("connection close listeners not fired", latch.await(2 * TemporaryQueueTest.CONNECTION_TTL,
TimeUnit.MILLISECONDS));
- sf = locator.createSessionFactory();
+ sf = addSessionFactory(locator.createSessionFactory());
session = sf.createSession(false, true, true);
session.start();
@@ -343,7 +343,7 @@
serverWithReattach.setReconnectAttempts(-1);
serverWithReattach.setRetryInterval(1000);
serverWithReattach.setConfirmationWindowSize(-1);
- ClientSessionFactory reattachSF = serverWithReattach.createSessionFactory();
+ ClientSessionFactory reattachSF = createSessionFactory(serverWithReattach);
ClientSession session = reattachSF.createSession(false, false);
session.createTemporaryQueue("tmpAd", "tmpQ");
@@ -428,7 +428,7 @@
// Will be using a single Session as this is how an issue was raised
for (int i = 0 ; i < iterations; i++)
{
- ClientSessionFactory clientsConnecton = locator.createSessionFactory();
+ ClientSessionFactory clientsConnecton = addSessionFactory(locator.createSessionFactory());
ClientSession localSession = clientsConnecton.createSession();
ClientProducer prod = localSession.createProducer(address);
@@ -481,8 +481,6 @@
}
finally
{
-// sessConsumerRed.close();
-// sessConsumerBlue.close();
localSession.close();
clientsConnecton.close();
}
@@ -514,9 +512,9 @@
}
});
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ ServerLocator locator = createInVMNonHALocator();
locator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
- sf = locator.createSessionFactory();
+ sf = addSessionFactory(locator.createSessionFactory());
session = sf.createSession(false, true, true);
session.createTemporaryQueue(address, queue);
@@ -548,8 +546,9 @@
session.close();
sf.close();
- ServerLocator locator2 = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
- sf = locator2.createSessionFactory();
+ ServerLocator locator2 = createInVMNonHALocator();
+
+ sf = addSessionFactory(locator2.createSessionFactory());
session = sf.createSession(false, true, true);
session.start();
@@ -695,9 +694,4 @@
retlocator.setClientFailureCheckPeriod(TemporaryQueueTest.CONNECTION_TTL / 3);
return retlocator;
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
12 years, 2 months
JBoss hornetq SVN: r12267 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-08 11:06:28 -0500 (Thu, 08 Mar 2012)
New Revision: 12267
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
Log:
Fix hanging test: Reverted code would never return. (AND Paged messages cannot be browsed)
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java 2012-03-08 16:06:09 UTC (rev 12266)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java 2012-03-08 16:06:28 UTC (rev 12267)
@@ -20,12 +20,6 @@
@Override
public void testFailWithBrowser() throws Exception
{
- int i = 0;
- while(true)
- {
- testCreateNewFactoryAfterFailover();
- tearDown();
- setUp();
- }
+ // paged messages are not available for browsing
}
}
12 years, 2 months
JBoss hornetq SVN: r12266 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-08 11:06:09 -0500 (Thu, 08 Mar 2012)
New Revision: 12266
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java
Log:
HORNETQ-720 Fix hang on test delaying 'in-sync' message for Backup.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2012-03-08 16:05:48 UTC (rev 12265)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2012-03-08 16:06:09 UTC (rev 12266)
@@ -543,8 +543,8 @@
}
System.out.println("received.size() = " + received.size());
session.close();
-
- Assert.assertTrue(retry <= 5);
+ final int retryLimit = 5;
+ Assert.assertTrue("Number of retries (" + retry + ")should be <= " + retryLimit, retry <= retryLimit);
}
private void createClientSessionFactory() throws Exception
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-03-08 16:05:48 UTC (rev 12265)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-03-08 16:06:09 UTC (rev 12266)
@@ -110,13 +110,13 @@
super.setUp();
clearData();
createConfigs();
-
-
-
+
+
+
liveServer.setIdentity(this.getClass().getSimpleName() + "/liveServer");
liveServer.start();
-
+
waitForServer(liveServer.getServer());
if (backupServer != null)
@@ -376,7 +376,7 @@
}
- protected void crash(final ClientSession... sessions) throws Exception
+ protected void crash(final ClientSession... sessions) throws Exception
{
liveServer.crash(sessions);
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java 2012-03-08 16:05:48 UTC (rev 12265)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java 2012-03-08 16:06:09 UTC (rev 12266)
@@ -26,6 +26,14 @@
}
@Override
+ protected void crash(boolean waitFailure, ClientSession... sessions) throws Exception
+ {
+ syncDelay.deliverUpToDateMsg();
+ waitForBackup(null, 5);
+ super.crash(waitFailure, sessions);
+ }
+
+ @Override
protected void tearDown() throws Exception
{
syncDelay.deliverUpToDateMsg();
12 years, 2 months
JBoss hornetq SVN: r12265 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-08 11:05:48 -0500 (Thu, 08 Mar 2012)
New Revision: 12265
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
Log:
Remove dead code
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java 2012-03-08 16:05:21 UTC (rev 12264)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java 2012-03-08 16:05:48 UTC (rev 12265)
@@ -50,7 +50,7 @@
// Attributes ----------------------------------------------------
final LargeMessageController bufferDelegate;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -64,7 +64,7 @@
// Public --------------------------------------------------------
/**
- *
+ *
*/
public void discardUnusedPackets()
{
@@ -102,35 +102,22 @@
}
/**
- *
+ *
* @param timeWait Milliseconds to Wait. 0 means forever
- * @throws Exception
*/
public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
{
return bufferDelegate.waitCompletion(timeWait);
}
- // Channel Buffer Implementation ---------------------------------
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.buffers.ChannelBuffer#array()
- */
- public byte[] array()
- {
- throw new IllegalAccessError("array not supported on LargeMessageBufferImpl");
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.buffers.ChannelBuffer#capacity()
- */
+ @Override
public int capacity()
{
return -1;
}
DataInputStream dataInput = null;
-
+
private DataInputStream getStream()
{
if (dataInput == null)
@@ -138,18 +125,18 @@
try
{
InputStream input = new HornetQBufferInputStream(bufferDelegate);
-
+
dataInput = new DataInputStream(new InflaterReader(input));
}
catch (Exception e)
{
throw new RuntimeException (e.getMessage(), e);
}
-
+
}
return dataInput;
}
-
+
private void positioningNotSupported()
{
throw new IllegalStateException("Position not supported over compressed large messages");
@@ -298,9 +285,9 @@
positioningNotSupported();
return 0;
}
-
-
+
+
public int getUnsignedMedium(final long index)
{
positioningNotSupported();
@@ -577,7 +564,7 @@
{
try
{
- return (short)getStream().readShort();
+ return getStream().readShort();
}
catch (Exception e)
{
@@ -589,7 +576,7 @@
{
try
{
- return (int)getStream().readUnsignedShort();
+ return getStream().readUnsignedShort();
}
catch (Exception e)
{
@@ -607,7 +594,7 @@
return value;
}
-
+
public int readUnsignedMedium()
{
return (readByte() & 0xff) << 16 | (readByte() & 0xff) << 8 | (readByte() & 0xff) << 0;
@@ -706,7 +693,7 @@
public void skipBytes(final int length)
{
-
+
try
{
for (int i = 0 ; i < length; i++)
@@ -825,7 +812,7 @@
{
return (char)readShort();
}
-
+
public char getChar(final int index)
{
return (char)getShort(index);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java 2012-03-08 16:05:21 UTC (rev 12264)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java 2012-03-08 16:05:48 UTC (rev 12265)
@@ -140,7 +140,7 @@
{
ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();
- private CountDownLatch latch;
+ private final CountDownLatch latch;
private final boolean acknowledge;
@@ -166,11 +166,5 @@
}
latch.countDown();
}
-
- public void reset(final CountDownLatch latch)
- {
- list.clear();
- this.latch = latch;
- }
}
}
12 years, 2 months
JBoss hornetq SVN: r12264 - in trunk/tests: integration-tests/src/test/java/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2012-03-08 11:05:21 -0500 (Thu, 08 Mar 2012)
New Revision: 12264
Modified:
trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
Log:
Organize imports
Modified: trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java
===================================================================
--- trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java 2012-03-08 16:04:45 UTC (rev 12263)
+++ trunk/tests/concurrent-tests/src/test/java/org/hornetq/tests/concurrent/stomp/ConcurrentStompTest.java 2012-03-08 16:05:21 UTC (rev 12264)
@@ -17,25 +17,13 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.protocol.stomp.Stomp;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.TransportConstants;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.tests.integration.stomp.StompTestBase;
-import org.hornetq.tests.util.UnitTestCase;
public class ConcurrentStompTest extends StompTestBase
{
@@ -125,9 +113,9 @@
{
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = socket.getOutputStream();
- for (int i = 0; i < bytes.length; i++)
+ for (byte b : bytes)
{
- outputStream.write(bytes[i]);
+ outputStream.write(b);
}
outputStream.flush();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java 2012-03-08 16:04:45 UTC (rev 12263)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/NIOvsOIOTest.java 2012-03-08 16:05:21 UTC (rev 12264)
@@ -19,12 +19,16 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import junit.framework.TestSuite;
-
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
@@ -53,153 +57,153 @@
{
testPerf(true);
}
-
+
public void testOIOPerf() throws Exception
{
testPerf(false);
}
-
+
private void doTest(String dest) throws Exception
{
-
+
final int numSenders = 1;
final int numReceivers = 1;
-
+
final int numMessages = 20000;
-
+
Receiver[] receivers = new Receiver[numReceivers];
-
+
Sender[] senders = new Sender[numSenders];
-
+
List<ClientSessionFactory> factories = new ArrayList<ClientSessionFactory>();
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
-
+
for (int i = 0; i < numReceivers; i++)
{
ClientSessionFactory sf = locator.createSessionFactory();
-
+
factories.add(sf);
receivers[i] = new Receiver(i, sf, numMessages * numSenders, dest);
-
+
receivers[i].prepare();
-
+
receivers[i].start();
}
-
+
for (int i = 0; i < numSenders; i++)
{
ClientSessionFactory sf = locator.createSessionFactory();
factories.add(sf);
-
+
senders[i] = new Sender(i, sf, numMessages, dest);
-
+
senders[i].prepare();
}
-
+
long start = System.currentTimeMillis();
-
+
for (int i = 0; i < numSenders; i++)
- {
+ {
senders[i].start();
}
-
+
for (int i = 0; i < numSenders; i++)
- {
+ {
senders[i].join();
}
-
+
for (int i = 0; i < numReceivers; i++)
- {
+ {
receivers[i].await();
}
-
+
long end = System.currentTimeMillis();
-
+
double rate = 1000 * (double)(numMessages * numSenders) / (end - start);
-
+
logAndSystemOut("Rate is " + rate + " msgs sec");
-
+
for (int i = 0; i < numSenders; i++)
- {
+ {
senders[i].terminate();
}
-
+
for (int i = 0; i < numReceivers; i++)
- {
+ {
receivers[i].terminate();
}
-
+
for (ClientSessionFactory sf: factories)
- {
+ {
sf.close();
}
-
+
locator.close();
}
private void testPerf(boolean nio) throws Exception
{
String acceptorFactoryClassName = "org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory";
-
+
Configuration conf = createDefaultConfig();
conf.setSecurityEnabled(false);
-
+
Map<String, Object> params = new HashMap<String, Object>();
-
+
params.put(TransportConstants.USE_NIO_PROP_NAME, nio);
conf.getAcceptorConfigurations().add(new TransportConfiguration(acceptorFactoryClassName, params));
HornetQServer server = HornetQServers.newHornetQServer(conf, false);
-
+
AddressSettings addressSettings = new AddressSettings();
-
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-
+
addressSettings.setMaxSizeBytes(10 * 1024 * 1024);
-
+
final String dest = "test-destination";
-
+
HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
-
+
repos.addMatch(dest, addressSettings);
server.start();
-
+
for (int i = 0; i < 2; i++)
{
doTest(dest);
- }
+ }
server.stop();
}
private class Sender extends Thread
{
- private ClientSessionFactory sf;
+ private final ClientSessionFactory sf;
- private int numMessages;
-
- private String dest;
+ private final int numMessages;
+ private final String dest;
+
private ClientSession session;
private ClientProducer producer;
-
- private int id;
+ private final int id;
+
Sender(int id, ClientSessionFactory sf, final int numMessages, final String dest)
{
this.id = id;
-
+
this.sf = sf;
-
+
this.numMessages = numMessages;
this.dest = dest;
@@ -212,10 +216,11 @@
producer = session.createProducer(dest);
}
+ @Override
public void run()
{
ClientMessage msg = session.createMessage(false);
-
+
for (int i = 0; i < numMessages; i++)
{
try
@@ -226,12 +231,12 @@
{
log.error("Caught exception", e);
}
-
+
//log.info(id + " sent message " + i);
-
+
}
}
-
+
public void terminate() throws Exception
{
session.close();
@@ -240,26 +245,26 @@
private class Receiver implements MessageHandler
{
- private ClientSessionFactory sf;
+ private final ClientSessionFactory sf;
- private int numMessages;
+ private final int numMessages;
- private String dest;
+ private final String dest;
private ClientSession session;
private ClientConsumer consumer;
-
- private int id;
-
+
+ private final int id;
+
private String queueName;
Receiver(int id, ClientSessionFactory sf, final int numMessages, final String dest)
{
this.id = id;
-
+
this.sf = sf;
-
+
this.numMessages = numMessages;
this.dest = dest;
@@ -283,7 +288,7 @@
session.start();
}
- private CountDownLatch latch = new CountDownLatch(1);
+ private final CountDownLatch latch = new CountDownLatch(1);
void await() throws Exception
{
@@ -302,24 +307,24 @@
{
log.error("Caught exception", e);
}
-
+
count++;
-
+
if (count == numMessages)
{
latch.countDown();
}
-
+
//log.info(id + " got msg " + count);
-
+
}
-
+
public void terminate() throws Exception
{
consumer.close();
-
+
session.deleteQueue(queueName);
-
+
session.close();
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java 2012-03-08 16:04:45 UTC (rev 12263)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java 2012-03-08 16:05:21 UTC (rev 12264)
@@ -21,11 +21,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.tests.util.RandomUtil;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java 2012-03-08 16:04:45 UTC (rev 12263)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java 2012-03-08 16:05:21 UTC (rev 12264)
@@ -17,12 +17,15 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -32,7 +35,7 @@
* A SessionClosedOnRemotingConnectionFailureTest
*
* @author Tim Fox
-
+
*/
public class SessionClosedOnRemotingConnectionFailureTest extends UnitTestCase
{
12 years, 2 months