JBoss hornetq SVN: r12283 - trunk/hornetq-core/src/main/java/org/hornetq/api/core/client.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-09 17:39:17 -0500 (Fri, 09 Mar 2012)
New Revision: 12283
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java
Log:
JBPAPP-8366 & JBPAPP-8377 - fixing leaks and duplicated resources
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java 2012-03-09 21:11:33 UTC (rev 12282)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/HornetQClient.java 2012-03-09 22:39:17 UTC (rev 12283)
@@ -115,6 +115,19 @@
}
/**
+ * Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
+ * as the cluster topology changes, and no HA backup information is propagated to the client
+ *
+ * @param ha The Locator will support topology updates and ha (this required the server to be clustered, otherwise the first connection will timeout)
+ * @param transportConfigurations
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, TransportConfiguration... transportConfigurations)
+ {
+ return new ServerLocatorImpl(ha, transportConfigurations);
+ }
+
+ /**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
* The UDP address and port are used to listen for live servers in the cluster
@@ -128,7 +141,23 @@
return new ServerLocatorImpl(false, groupConfiguration);
}
+
/**
+ * Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
+ *
+ * The UDP address and port are used to listen for live servers in the cluster
+ *
+ * @param ha The Locator will support topology updates and ha (this required the server to be clustered, otherwise the first connection will timeout)
+ * @param discoveryAddress The UDP group address to listen for updates
+ * @param discoveryPort the UDP port to listen for updates
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ return new ServerLocatorImpl(ha, groupConfiguration);
+ }
+
+ /**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers
* leave or join and new backups are appointed or removed.
* <p>
12 years, 2 months
JBoss hornetq SVN: r12282 - in branches/Branch_2_2_AS7/src/main/org/hornetq: jms/server/recovery and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-03-09 16:11:33 -0500 (Fri, 09 Mar 2012)
New Revision: 12282
Added:
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
JBPAPP-8366 & JBPAPP-8377 - fixing leaks and duplicated resource on TM
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/client/HornetQClient.java 2012-03-09 21:09:17 UTC (rev 12281)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/api/core/client/HornetQClient.java 2012-03-09 21:11:33 UTC (rev 12282)
@@ -113,6 +113,18 @@
}
/**
+ * Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
+ * as the cluster topology changes, and no HA backup information is propagated to the client
+ *
+ * @param transportConfigurations
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, TransportConfiguration... transportConfigurations)
+ {
+ return new ServerLocatorImpl(ha, transportConfigurations);
+ }
+
+ /**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
* The UDP address and port are used to listen for live servers in the cluster
@@ -127,6 +139,20 @@
}
/**
+ * Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
+ *
+ * The UDP address and port are used to listen for live servers in the cluster
+ *
+ * @param discoveryAddress The UDP group address to listen for updates
+ * @param discoveryPort the UDP port to listen for updates
+ * @return the ServerLocator
+ */
+ public static ServerLocator createServerLocator(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ return new ServerLocatorImpl(ha, groupConfiguration);
+ }
+
+ /**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The initial list of servers supplied in this method is simply to make an initial connection to the cluster, once that connection is made, up to date
* cluster topology information is downloaded and automatically updated whenever the cluster topology changes. If the topology includes backup servers
Added: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java (rev 0)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-09 21:11:33 UTC (rev 12282)
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.recovery;
+
+import java.util.HashMap;
+
+import org.hornetq.core.logging.Logger;
+import org.jboss.tm.XAResourceRecoveryRegistry;
+
+/**
+ * This class is a base class for the integration layer where
+ * we verify if a given connection factory already have a recovery registered
+ *
+ * @author Clebert
+ *
+ *
+ */
+public abstract class HornetQRegistryBase implements RecoveryRegistry
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(HornetQRegistryBase.class);
+
+ // Attributes ----------------------------------------------------
+
+ private static HashMap<XARecoveryConfig, HornetQResourceRecovery> configSet = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public abstract XAResourceRecoveryRegistry getTMRegistry();
+
+ public HornetQResourceRecovery register(final HornetQResourceRecovery resourceRecovery)
+ {
+ synchronized (configSet)
+ {
+ HornetQResourceRecovery recovery = configSet.get(resourceRecovery.getConfig());
+
+ if (recovery == null)
+ {
+ recovery = resourceRecovery;
+ if (log.isDebugEnabled())
+ {
+ log.debug("Registering a new recovery for " + recovery.getConfig() + ", recovery = " + resourceRecovery);
+ }
+ configSet.put(resourceRecovery.getConfig(), resourceRecovery);
+ getTMRegistry().addXAResourceRecovery(recovery);
+ }
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Return pre-existent recovery=" + recovery + " for configuration = " + resourceRecovery.getConfig());
+ }
+ }
+ recovery.incrementUsage();
+ return recovery;
+ }
+ }
+
+
+
+ public void unRegister(final HornetQResourceRecovery resourceRecovery)
+ {
+ synchronized (configSet)
+ {
+ HornetQResourceRecovery recFound = configSet.get(resourceRecovery.getConfig());
+
+ if (recFound != null && recFound.decrementUsage() == 0)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Removing recovery information for " + recFound + " as all the deployments were already removed");
+ }
+ getTMRegistry().removeXAResourceRecovery(recFound);
+ configSet.remove(resourceRecovery);
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2012-03-09 21:09:17 UTC (rev 12281)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2012-03-09 21:11:33 UTC (rev 12282)
@@ -32,12 +32,15 @@
public class HornetQResourceRecovery implements XAResourceRecovery
{
private final XARecoveryConfig config;
- private XAResource[] xaResources;
+ private final XAResource[] xaResources;
+
+ private int usage;
+
public HornetQResourceRecovery(XARecoveryConfig config)
{
this.config = config;
- xaResources = new XAResource[]{new HornetQXAResourceWrapper(config)};
+ this.xaResources = new XAResource[] { new HornetQXAResourceWrapper(config) };
}
public XAResource[] getXAResources()
@@ -45,15 +48,36 @@
return xaResources;
}
+ public XARecoveryConfig getConfig()
+ {
+ return config;
+ }
+
+ /** we may have several connection factories referencing the same connection recovery entry.
+ * Because of that we need to make a count of the number of the instances that are referencing it,
+ * so we will remove it as soon as we are done */
+ public synchronized int incrementUsage()
+ {
+ return ++usage;
+ }
+
+ public synchronized int decrementUsage()
+ {
+ return --usage;
+ }
+
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
- HornetQResourceRecovery that = (HornetQResourceRecovery) o;
+ HornetQResourceRecovery that = (HornetQResourceRecovery)o;
- if (config != null ? !config.equals(that.config) : that.config != null) return false;
+ if (config != null ? !config.equals(that.config) : that.config != null)
+ return false;
return true;
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2012-03-09 21:09:17 UTC (rev 12281)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2012-03-09 21:11:33 UTC (rev 12282)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -242,7 +243,7 @@
* @return the connectionFactory
* @throws XAException for any problem
*/
- public XAResource getDelegate(boolean retry) throws XAException
+ private XAResource getDelegate(boolean retry) throws XAException
{
XAResource result = null;
Exception error = null;
@@ -316,7 +317,14 @@
try
{
- serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
+ if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
+ {
+ serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getDiscoveryConfiguration());
+ }
+ else
+ {
+ serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getTransportConfig());
+ }
serverLocator.disableFinalizeCheck();
csf = serverLocator.createSessionFactory();
if (xaRecoveryConfig.getUsername() == null)
@@ -334,10 +342,29 @@
1);
}
}
- catch (HornetQException e)
+ catch (Throwable e)
{
+ log.warn("Can't connect to " + xaRecoveryConfig + " on auto-generated resource recovery", e);
+ if (log.isDebugEnabled())
+ {
+ log.debug(e.getMessage(), e);
+ }
+
+ try
+ {
+ if (cs != null) cs.close();
+ if (serverLocator != null) serverLocator.close();
+ }
+ catch (Throwable ignored)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(e.getMessage(), ignored);
+ }
+ }
continue;
}
+
cs.addFailureListener(this);
synchronized (HornetQXAResourceWrapper.lock)
@@ -392,9 +419,9 @@
oldServerLocator.close();
}
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
- HornetQXAResourceWrapper.log.trace("Ignored error during close", ignored);
+ HornetQXAResourceWrapper.log.debug("Ignored error during close", ignored);
}
}
@@ -410,10 +437,9 @@
{
log.warn(e.getMessage(), e);
- if (e.errorCode == XAException.XA_RETRY)
- {
- close();
- }
+
+ // If any exception happened, we close the connection so we may start fresh
+ close();
throw e;
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java 2012-03-09 21:09:17 UTC (rev 12281)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java 2012-03-09 21:11:33 UTC (rev 12282)
@@ -28,7 +28,7 @@
*/
public interface RecoveryRegistry
{
- void register(HornetQResourceRecovery resourceRecovery);
+ HornetQResourceRecovery register(HornetQResourceRecovery resourceRecovery);
void unRegister(HornetQResourceRecovery xaRecoveryConfig);
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-09 21:09:17 UTC (rev 12281)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-09 21:11:33 UTC (rev 12282)
@@ -13,10 +13,17 @@
package org.hornetq.jms.server.recovery;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import java.util.Arrays;
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
+
/**
+ *
+ * This represents the configuration of a single connection factory.
+ *
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * @author Clebert Suconic
*
* A wrapper around info needed for the xa recovery resource
* Date: 3/23/11
@@ -24,22 +31,46 @@
*/
public class XARecoveryConfig
{
- private final HornetQConnectionFactory hornetQConnectionFactory;
+
+ private final boolean ha;
+ private final TransportConfiguration[] transportConfiguration;
+ private final DiscoveryGroupConfiguration discoveryConfiguration;
private final String username;
private final String password;
- public XARecoveryConfig(HornetQConnectionFactory hornetQConnectionFactory, String username, String password)
+ public XARecoveryConfig(final boolean ha, final TransportConfiguration[] transportConfiguration, final String username, final String password)
{
- this.hornetQConnectionFactory = hornetQConnectionFactory;
+ this.transportConfiguration = transportConfiguration;
+ this.discoveryConfiguration = null;
this.username = username;
this.password = password;
+ this.ha = ha;
}
- public HornetQConnectionFactory getHornetQConnectionFactory()
+ public XARecoveryConfig(final boolean ha, final DiscoveryGroupConfiguration discoveryConfiguration, final String username, final String password)
{
- return hornetQConnectionFactory;
+ this.discoveryConfiguration = discoveryConfiguration;
+ this.transportConfiguration = null;
+ this.username = username;
+ this.password = password;
+ this.ha = ha;
}
+
+ public boolean isHA()
+ {
+ return ha;
+ }
+ public DiscoveryGroupConfiguration getDiscoveryConfiguration()
+ {
+ return discoveryConfiguration;
+ }
+
+ public TransportConfiguration[] getTransportConfig()
+ {
+ return transportConfiguration;
+ }
+
public String getUsername()
{
return username;
@@ -49,24 +80,44 @@
{
return password;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((discoveryConfiguration == null) ? 0 : discoveryConfiguration.hashCode());
+ result = prime * result + Arrays.hashCode(transportConfiguration);
+ return result;
+ }
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
@Override
- public boolean equals(Object o)
+ public boolean equals(Object obj)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- XARecoveryConfig that = (XARecoveryConfig) o;
-
- if (hornetQConnectionFactory != null ? !hornetQConnectionFactory.equals(that.hornetQConnectionFactory) : that.hornetQConnectionFactory != null)
+ if (this == obj)
+ return true;
+ if (obj == null)
return false;
- if (password != null ? !password.equals(that.password) : that.password != null) return false;
- if (username != null ? !username.equals(that.username) : that.username != null) return false;
-
+ if (getClass() != obj.getClass())
+ return false;
+ XARecoveryConfig other = (XARecoveryConfig)obj;
+ if (discoveryConfiguration == null)
+ {
+ if (other.discoveryConfiguration != null)
+ return false;
+ }
+ else if (!discoveryConfiguration.equals(other.discoveryConfiguration))
+ return false;
+ if (!Arrays.equals(transportConfiguration, other.transportConfiguration))
+ return false;
return true;
}
-
-
/* (non-Javadoc)
* @see java.lang.Object#toString()
@@ -74,20 +125,12 @@
@Override
public String toString()
{
- return "XARecoveryConfig [hornetQConnectionFactory=" + hornetQConnectionFactory +
+ return "XARecoveryConfig [transportConfiguration = " + Arrays.toString(transportConfiguration) +
+ ", discoveryConfiguration = " + discoveryConfiguration +
", username=" +
username +
", password=" +
password +
"]";
}
-
- @Override
- public int hashCode()
- {
- int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
- result = 31 * result + (username != null ? username.hashCode() : 0);
- result = 31 * result + (password != null ? password.hashCode() : 0);
- return result;
- }
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-09 21:09:17 UTC (rev 12281)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-03-09 21:11:33 UTC (rev 12282)
@@ -21,21 +21,18 @@
*/
package org.hornetq.ra.recovery;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.TransportConfiguration;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Set;
+
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
import org.hornetq.jms.server.recovery.RecoveryRegistry;
import org.hornetq.jms.server.recovery.XARecoveryConfig;
-import org.hornetq.ra.Util;
import org.hornetq.utils.ClassloadingUtil;
+import org.hornetq.utils.ConcurrentHashSet;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* 9/21/11
@@ -47,9 +44,9 @@
private RecoveryRegistry registry;
private String resourceRecoveryClassNames = "org.jboss.as.messaging.jms.AS7RecoveryRegistry;org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry";
+
+ private final Set<HornetQResourceRecovery> resources = new ConcurrentHashSet<HornetQResourceRecovery>();
- private Map<XARecoveryConfig, HornetQResourceRecovery> configMap = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
-
public void start()
{
locateRecoveryRegistry();
@@ -59,17 +56,45 @@
{
log.debug("registering recovery for factory : " + factory);
- if(!isRegistered(factory) && registry != null)
+ HornetQResourceRecovery resourceRecovery = newResourceRecovery(factory, userName, password);
+
+ if (registry != null)
{
- XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
- HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
- registry.register(resourceRecovery);
- configMap.put(xaRecoveryConfig, resourceRecovery);
- return resourceRecovery;
+ resourceRecovery = registry.register(resourceRecovery);
+ if (resourceRecovery != null)
+ {
+ resources.add(resourceRecovery);
+ }
}
- return null;
+
+ return resourceRecovery;
}
+ /**
+ * @param factory
+ * @param userName
+ * @param password
+ * @return
+ */
+ private HornetQResourceRecovery newResourceRecovery(HornetQConnectionFactory factory,
+ String userName,
+ String password)
+ {
+ XARecoveryConfig xaRecoveryConfig;
+
+ if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null)
+ {
+ xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password);
+ }
+ else
+ {
+ xaRecoveryConfig = new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password);
+ }
+
+ HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ return resourceRecovery;
+ }
+
public void unRegister(HornetQResourceRecovery resourceRecovery)
{
registry.unRegister(resourceRecovery);
@@ -77,11 +102,11 @@
public void stop()
{
- for (HornetQResourceRecovery hornetQResourceRecovery : configMap.values())
+ for (HornetQResourceRecovery hornetQResourceRecovery : resources)
{
registry.unRegister(hornetQResourceRecovery);
}
- configMap.clear();
+ resources.clear();
}
private void locateRecoveryRegistry()
@@ -108,9 +133,9 @@
{
registry = new RecoveryRegistry()
{
- public void register(HornetQResourceRecovery resourceRecovery)
+ public HornetQResourceRecovery register(HornetQResourceRecovery resourceRecovery)
{
- //no op
+ return null;
}
public void unRegister(HornetQResourceRecovery xaRecoveryConfig)
@@ -125,52 +150,6 @@
}
}
-
- public boolean isRegistered(HornetQConnectionFactory factory)
- {
- for (XARecoveryConfig xaRecoveryConfig : configMap.keySet())
- {
- TransportConfiguration[] transportConfigurations = factory.getServerLocator().getStaticTransportConfigurations();
-
- if (transportConfigurations != null)
- {
- TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getStaticTransportConfigurations();
- if(xaConfigurations == null)
- {
- break;
- }
- if(transportConfigurations.length != xaConfigurations.length)
- {
- break;
- }
- boolean theSame=true;
- for(int i = 0; i < transportConfigurations.length; i++)
- {
- TransportConfiguration tc = transportConfigurations[i];
- TransportConfiguration xaTc = xaConfigurations[i];
- if(!tc.equals(xaTc))
- {
- theSame = false;
- break;
- }
- }
- if(theSame)
- {
- return theSame;
- }
- }
- else
- {
- DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getDiscoveryGroupConfiguration();
- if(discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
- {
- return true;
- }
- }
- }
- return false;
- }
-
/** This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
* utility class, as it would be a door to load anything you like in a safe VM.
* For that reason any class trying to do a privileged block should do with the AccessController directly.
12 years, 2 months
JBoss hornetq SVN: r12281 - branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-03-09 16:09:17 -0500 (Fri, 09 Mar 2012)
New Revision: 12281
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
Log:
format only
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2012-03-09 21:07:12 UTC (rev 12280)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2012-03-09 21:09:17 UTC (rev 12281)
@@ -32,27 +32,27 @@
public class HornetQResourceRecovery implements XAResourceRecovery
{
private final XARecoveryConfig config;
-
+
private final XAResource[] xaResources;
-
+
private int usage;
public HornetQResourceRecovery(XARecoveryConfig config)
{
this.config = config;
- this.xaResources = new XAResource[]{new HornetQXAResourceWrapper(config)};
+ this.xaResources = new XAResource[] { new HornetQXAResourceWrapper(config) };
}
public XAResource[] getXAResources()
{
return xaResources;
}
-
+
public XARecoveryConfig getConfig()
{
return config;
}
-
+
/** we may have several connection factories referencing the same connection recovery entry.
* Because of that we need to make a count of the number of the instances that are referencing it,
* so we will remove it as soon as we are done */
@@ -60,7 +60,7 @@
{
return ++usage;
}
-
+
public synchronized int decrementUsage()
{
return --usage;
@@ -69,12 +69,15 @@
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
- HornetQResourceRecovery that = (HornetQResourceRecovery) o;
+ HornetQResourceRecovery that = (HornetQResourceRecovery)o;
- if (config != null ? !config.equals(that.config) : that.config != null) return false;
+ if (config != null ? !config.equals(that.config) : that.config != null)
+ return false;
return true;
}
12 years, 2 months
JBoss hornetq SVN: r12280 - branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-03-09 16:07:12 -0500 (Fri, 09 Mar 2012)
New Revision: 12280
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
Log:
small tweak
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-09 20:56:44 UTC (rev 12279)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQRegistryBase.java 2012-03-09 21:07:12 UTC (rev 12280)
@@ -64,7 +64,7 @@
{
if (log.isDebugEnabled())
{
- log.info("Return pre-existent recovery=" + recovery + " for configuration = " + resourceRecovery.getConfig());
+ log.debug("Return pre-existent recovery=" + recovery + " for configuration = " + resourceRecovery.getConfig());
}
}
recovery.incrementUsage();
12 years, 2 months
JBoss hornetq SVN: r12279 - branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-03-09 15:56:44 -0500 (Fri, 09 Mar 2012)
New Revision: 12279
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
Log:
small tweak (no changes at all.. just javadoc)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-09 18:01:22 UTC (rev 12278)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2012-03-09 20:56:44 UTC (rev 12279)
@@ -17,13 +17,13 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.jms.client.HornetQConnectionFactory;
/**
*
* This represents the configuration of a single connection factory.
*
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * @author Clebert Suconic
*
* A wrapper around info needed for the xa recovery resource
* Date: 3/23/11
12 years, 2 months
JBoss hornetq SVN: r12278 - in branches/Branch_2_2_EAP_HORNETQ-787: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-03-09 13:01:22 -0500 (Fri, 09 Mar 2012)
New Revision: 12278
Added:
branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
Modified:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
Log:
[HORNETQ-787] added integration testing, finished import and export
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java 2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -52,6 +52,7 @@
public static final String QUEUE_NAME = "name";
public static final String PROPERTY_TYPE_BOOLEAN = "boolean";
public static final String PROPERTY_TYPE_BYTE = "byte";
+ public static final String PROPERTY_TYPE_BYTES = "bytes";
public static final String PROPERTY_TYPE_SHORT = "short";
public static final String PROPERTY_TYPE_INTEGER = "integer";
public static final String PROPERTY_TYPE_LONG = "long";
@@ -59,5 +60,5 @@
public static final String PROPERTY_TYPE_DOUBLE = "double";
public static final String PROPERTY_TYPE_STRING = "string";
public static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
- public static final int CHUNK = 1000;
+ public static final Long CHUNK = 1000L;
}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java 2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -17,21 +17,37 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientRequestor;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.utils.Base64;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
/**
* @author <a href="mailto:jbertram@redhat.com">Justin Bertram</a>
@@ -48,21 +64,41 @@
ClientSession session;
+ boolean localSession = false;
+
+ Map<String, String> addressMap = new HashMap<String, String>();
+
+ String tempFileName = "";
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public XmlDataReader(String inputFile, String host, String port)
+ public XmlDataReader(InputStream inputStream, ClientSession session)
{
try
{
- reader = XMLInputFactory.newInstance().createXMLStreamReader(new java.io.FileInputStream(inputFile));
+ reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+ this.session = session;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public XmlDataReader(InputStream inputStream, String host, String port)
+ {
+ try
+ {
+ reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
HashMap<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
ClientSessionFactory sf = serverLocator.createSessionFactory();
session = sf.createSession(false, true, true);
+ localSession = true;
}
catch (Exception e)
{
@@ -70,6 +106,11 @@
}
}
+ public XmlDataReader(String inputFile, String host, String port) throws FileNotFoundException
+ {
+ this(new FileInputStream(inputFile), host, port);
+ }
+
// Public --------------------------------------------------------
public static void main(String arg[])
@@ -83,7 +124,7 @@
try
{
XmlDataReader xmlDataReader = new XmlDataReader(arg[0], arg[1], arg[2]);
- xmlDataReader.readXMLData();
+ xmlDataReader.processXml();
}
catch (Exception e)
{
@@ -91,36 +132,42 @@
}
}
- private void readXMLData() throws Exception
+ public void processXml() throws Exception
{
- while (reader.hasNext())
+ try
{
- log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
- if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
+ while (reader.hasNext())
{
- if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+ log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
+ if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
{
- bindQueue();
+ if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+ {
+ bindQueue();
+ }
+ if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
+ {
+ processMessage();
+ }
}
- if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
- {
- sendMessage();
- }
+ reader.next();
}
- reader.next();
+ } finally
+ {
+ if (localSession)
+ {
+ session.close();
+ }
}
-
- session.close();
}
- private void sendMessage() throws Exception
+ private void processMessage() throws Exception
{
- byte type = 0;
- boolean isLarge = false;
- byte priority = 0;
- long expiration = 0;
- long timestamp = 0;
-// HashMap<String, Object> properties = new HashMap<String, Object>();
+ Byte type = 0;
+ Byte priority = 0;
+ Long expiration = 0L;
+ Long timestamp = 0L;
+ org.hornetq.utils.UUID userId = null;
ArrayList<String> queues = new ArrayList<String>();
for (int i = 0; i < reader.getAttributeCount(); i++)
@@ -128,31 +175,8 @@
String attributeName = reader.getAttributeLocalName(i);
if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
{
- String value = reader.getAttributeValue(i);
- if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
- {
- type = Message.DEFAULT_TYPE;
- } else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
- {
- type = Message.BYTES_TYPE;
- } else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
- {
- type = Message.MAP_TYPE;
- } else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
- {
- type = Message.OBJECT_TYPE;
- } else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
- {
- type = Message.STREAM_TYPE;
- } else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
- {
- type = Message.TEXT_TYPE;
- }
+ type = getMessageType(reader.getAttributeValue(i));
}
- else if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName))
- {
- isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
- }
else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
{
priority = Byte.parseByte(reader.getAttributeValue(i));
@@ -165,13 +189,16 @@
{
timestamp = Long.parseLong(reader.getAttributeValue(i));
}
+ else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
+ {
+ userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
+ }
}
Message message = session.createMessage(type, true, expiration, timestamp, priority);
+ message.setUserID(userId);
boolean endLoop = false;
-// StringBuilder propertiesString = new StringBuilder();
-// StringBuilder queuesString = new StringBuilder();
while (reader.hasNext())
{
@@ -180,85 +207,15 @@
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
{
- reader.next();
- int start = reader.getTextStart();
- int length = reader.getTextLength();
- message.getBodyBuffer().writeBytes(decode(new String(reader.getTextCharacters(), start, length).trim()));
+ processMessageBody(message);
}
else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
{
- String key = "";
- String value = "";
- String propertyType = "";
-
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
- {
-// propertiesString.append("(" + XmlDataConstants.PROPERTY_NAME + "=" + reader.getAttributeValue(i));
- key = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
- {
-// propertiesString.append("," + XmlDataConstants.PROPERTY_VALUE + "=" + reader.getAttributeValue(i));
- value = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
- {
-// propertiesString.append(", " + XmlDataConstants.PROPERTY_TYPE + "=" + reader.getAttributeValue(i) + ") ");
- propertyType = reader.getAttributeValue(i);
- }
- }
-
- if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
- {
- message.putShortProperty(key, Short.parseShort(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
- {
- message.putBooleanProperty(key, Boolean.parseBoolean(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
- {
- message.putByteProperty(key, Byte.parseByte(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
- {
- message.putDoubleProperty(key, Double.parseDouble(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
- {
- message.putFloatProperty(key, Float.parseFloat(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
- {
- message.putIntProperty(key, Integer.parseInt(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
- {
- message.putLongProperty(key, Long.parseLong(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
- {
- message.putStringProperty(new SimpleString(key), new SimpleString(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
- {
- message.putStringProperty(key, value);
- }
+ processMessageProperties(message);
}
else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
{
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.QUEUE_NAME.equals(attributeName))
- {
-// queuesString.append("(" + XmlDataConstants.QUEUE_NAME + "=" + reader.getAttributeValue(i) + ")");
- queues.add(reader.getAttributeValue(i));
- }
- }
+ processMessageQueues(queues);
}
break;
case XMLStreamConstants.END_ELEMENT:
@@ -275,17 +232,191 @@
reader.next();
}
- for(String queueName : queues)
+ sendMessage(queues, message);
+ }
+
+ private Byte getMessageType(String value)
+ {
+ Byte type = Message.DEFAULT_TYPE;
+ if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
{
- System.out.println("To " + queueName + ": " + message);
- ClientProducer producer = session.createProducer(queueName);
- producer.send(message);
+ type = Message.DEFAULT_TYPE;
}
+ else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
+ {
+ type = Message.BYTES_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
+ {
+ type = Message.MAP_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
+ {
+ type = Message.OBJECT_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
+ {
+ type = Message.STREAM_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
+ {
+ type = Message.TEXT_TYPE;
+ }
+ return type;
+ }
-// System.out.println("Sending message (type=" + type + ", isLarge=" + isLarge + ", priority=" + priority + ", expiration=" + expiration + ", timestamp=" + timestamp + ", properties=" + properties + ", queues=" + queues + ", body=" + new String(decode(body.trim())) + ")");
+ private void sendMessage(ArrayList<String> queues, Message message) throws Exception
+ {
+// System.out.print("To " + addressMap.get(queues.get(0)) + ": " + message + " (routed to: ");
+ ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
+ for (String queue : queues)
+ {
+ ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
+ ClientMessage managementMessage = session.createMessage(false);
+ ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
+ session.start();
+ ClientMessage reply = requestor.request(managementMessage);
+ long queueID = (Integer) ManagementHelper.getResult(reply);
+// System.out.print(queue + ", ");
+ session.queueQuery(new SimpleString(queue));
+ buffer.putLong(queueID);
+ }
+// System.out.println(")");
+ message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ ClientProducer producer = session.createProducer(addressMap.get(queues.get(0)));
+ producer.send(message);
+ producer.close();
+ File tempFile = new File(tempFileName);
+ if (!tempFile.delete())
+ {
+ System.err.println("Couldn't delete " + tempFileName);
+ }
}
+ private void processMessageQueues(ArrayList<String> queues)
+ {
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
+ {
+ queues.add(reader.getAttributeValue(i));
+ }
+ }
+ }
+
+ private void processMessageProperties(Message message)
+ {
+ String key = "";
+ String value = "";
+ String propertyType = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ String attributeName = reader.getAttributeLocalName(i);
+ if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
+ {
+ key = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
+ {
+ value = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
+ {
+ propertyType = reader.getAttributeValue(i);
+ }
+ }
+
+ if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
+ {
+ message.putShortProperty(key, Short.parseShort(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
+ {
+ message.putBooleanProperty(key, Boolean.parseBoolean(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
+ {
+ message.putByteProperty(key, Byte.parseByte(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
+ {
+ message.putBytesProperty(key, decode(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
+ {
+ message.putDoubleProperty(key, Double.parseDouble(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
+ {
+ message.putFloatProperty(key, Float.parseFloat(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
+ {
+ message.putIntProperty(key, Integer.parseInt(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
+ {
+ message.putLongProperty(key, Long.parseLong(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
+ {
+ message.putStringProperty(new SimpleString(key), new SimpleString(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
+ {
+ message.putStringProperty(key, value);
+ }
+ }
+
+ private void processMessageBody(Message message) throws XMLStreamException, IOException
+ {
+ boolean isLarge = false;
+
+ if (reader.getAttributeCount() > 0)
+ {
+ isLarge = Boolean.parseBoolean(reader.getAttributeValue(0));
+ }
+ reader.next();
+ if (isLarge)
+ {
+ tempFileName = UUID.randomUUID().toString() + ".tmp";
+ OutputStream out = new FileOutputStream(tempFileName);
+ while (reader.hasNext())
+ {
+ if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
+ {
+ break;
+ }
+ else
+ {
+// System.out.println("Reading " + reader.getTextLength() + " characters.");
+ String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+ String trimmedCharacters = characters.trim();
+ if (trimmedCharacters.length() > 0)
+ {
+ byte[] data = decode(trimmedCharacters);
+// System.out.println(new String(data));
+// System.out.println("Writing " + data.length + " bytes.");
+ out.write(data);
+ }
+ }
+ reader.next();
+ }
+ out.close();
+ FileInputStream fileInputStream = new FileInputStream(tempFileName);
+ BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+ ((ClientMessage) message).setBodyInputStream(bufferedInput);
+ }
+ else
+ {
+ reader.next(); // step past the "indentation" characters to get to the CDATA
+ String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+ message.getBodyBuffer().writeBytes(decode(characters.trim()));
+ }
+ }
+
private void bindQueue() throws Exception
{
String queueName = "";
@@ -309,9 +440,16 @@
}
}
- session.createQueue(address, queueName, filter, true);
+ ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
- System.out.println("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
+ if (!queueQuery.isExists())
+ {
+ session.createQueue(address, queueName, filter, true);
+ }
+
+ addressMap.put(queueName, address);
+
+// System.out.println("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
// Package protected ---------------------------------------------
@@ -322,7 +460,7 @@
private static byte[] decode(String data)
{
- return Base64.decode(data);
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java 2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -64,12 +64,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.MessageDescribe;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
/**
* @author <a href="mailto:jbertram@redhat.com">Justin Bertram</a>
@@ -118,7 +113,7 @@
return executor;
}
};
-
+
storageManager = new JournalStorageManager(config, executorFactory);
messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
@@ -140,7 +135,8 @@
XMLStreamWriter.class.getClassLoader(),
new Class[]{XMLStreamWriter.class},
handler);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
e.printStackTrace();
}
@@ -157,7 +153,7 @@
}
try
- {
+ {
// long start = System.currentTimeMillis();
XmlDataWriter xmlDataWriter = new XmlDataWriter(System.out, arg[0], arg[1], arg[2], arg[3]);
xmlDataWriter.writeXMLData();
@@ -194,7 +190,7 @@
messageJournal.start();
- ((JournalImpl)messageJournal).load(records, null, null, false);
+ ((JournalImpl) messageJournal).load(records, null, null, false);
for (RecordInfo info : records)
{
@@ -294,7 +290,7 @@
bindingsJournal.start();
- ((JournalImpl)bindingsJournal).load(records, null, null, false);
+ ((JournalImpl) bindingsJournal).load(records, null, null, false);
for (RecordInfo info : records)
{
@@ -456,26 +452,55 @@
{
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
LargeServerMessage largeMessage = (LargeServerMessage) message;
+ BodyEncoder encoder = null;
try
{
- BodyEncoder encoder = largeMessage.getBodyEncoder();
+ encoder = largeMessage.getBodyEncoder();
encoder.open();
- for (long i = 0; i < encoder.getLargeBodySize(); i += XmlDataConstants.CHUNK)
+ long totalBytesWritten = 0;
+ Long bufferSize;
+ long bodySize = encoder.getLargeBodySize();
+ for (long i = 0; i < bodySize; i += XmlDataConstants.CHUNK)
{
- HornetQBuffer buffer = HornetQBuffers.fixedBuffer(XmlDataConstants.CHUNK);
- encoder.encode(buffer, XmlDataConstants.CHUNK);
- xmlWriter.writeCharacters(encode(buffer.toByteBuffer().array()));
+ Long remainder = bodySize - totalBytesWritten;
+ if (remainder >= XmlDataConstants.CHUNK)
+ {
+ bufferSize = XmlDataConstants.CHUNK;
+ }
+ else
+ {
+ bufferSize = remainder;
+ }
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
+ encoder.encode(buffer, bufferSize.intValue());
+ xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
+ totalBytesWritten += bufferSize;
}
+ encoder.close();
}
catch (HornetQException e)
{
e.printStackTrace();
}
+ finally
+ {
+ if (encoder != null)
+ {
+ try
+ {
+ encoder.close();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
}
else
{
- xmlWriter.writeCharacters(encode(message.getBodyBuffer().toByteBuffer().array()));
+ xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
}
xmlWriter.writeEndElement(); // end MESSAGE_BODY
}
@@ -499,7 +524,15 @@
Object value = message.getObjectProperty(key);
xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+ if (value instanceof byte[])
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, encode((byte[]) value));
+ }
+ else
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+ }
+
if (value instanceof Boolean)
{
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
@@ -536,6 +569,10 @@
{
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING);
}
+ else if (value instanceof byte[])
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
+ }
}
xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
}
@@ -626,7 +663,7 @@
target.writeCharacters(LINE_SEPARATOR);
target.writeCharacters(indent(depth));
}
- else if ("writeEmptyElement".equals(m) || "writeCharacters".equals(m))
+ else if ("writeEmptyElement".equals(m) || "writeCData".equals(m))
{
target.writeCharacters(LINE_SEPARATOR);
target.writeCharacters(indent(depth));
Added: branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -0,0 +1,468 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
+import org.hornetq.core.persistence.impl.journal.XmlDataReader;
+import org.hornetq.core.persistence.impl.journal.XmlDataWriter;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * A ExportFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class XmlImportExportTest extends ServiceTestBase
+{
+ public static final int CONSUMER_TIMEOUT = 5000;
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected void tearDown() throws Exception
+ {
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void testMessageProperties() throws Exception
+ {
+ final String QUEUE_NAME = "A1";
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+ ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+ StringBuilder international = new StringBuilder();
+ for (char x = 800; x < 1200; x++)
+ {
+ international.append(x);
+ }
+
+ String special = "\"<>'&";
+
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.getBodyBuffer().writeString("Bob the giant pig " + i);
+ msg.putBooleanProperty("myBooleanProperty", Boolean.TRUE);
+ msg.putByteProperty("myByteProperty", new Byte("0"));
+ msg.putBytesProperty("myBytesProperty", new byte[]{0, 1, 2, 3, 4});
+ msg.putDoubleProperty("myDoubleProperty", i * 1.6);
+ msg.putFloatProperty("myFloatProperty", i * 2.5F);
+ msg.putIntProperty("myIntProperty", i);
+ msg.putLongProperty("myLongProperty", Long.MAX_VALUE - i);
+ msg.putObjectProperty("myObjectProperty", i);
+ msg.putShortProperty("myShortProperty", new Integer(i).shortValue());
+ msg.putStringProperty("myStringProperty", "myStringPropertyValue_" + i);
+ msg.putStringProperty("myNonAsciiStringProperty", international.toString());
+ msg.putStringProperty("mySpecialCharacters", special);
+ producer.send(msg);
+ }
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+ session.start();
+
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage msg = consumer.receive(CONSUMER_TIMEOUT);
+ byte[] body = new byte[msg.getBodySize()];
+ msg.getBodyBuffer().readBytes(body);
+ Assert.assertTrue(new String(body).contains("Bob the giant pig " + i));
+ Assert.assertEquals(msg.getBooleanProperty("myBooleanProperty"), Boolean.TRUE);
+ Assert.assertEquals(msg.getByteProperty("myByteProperty"), new Byte("0"));
+ byte[] bytes = msg.getBytesProperty("myBytesProperty");
+ for (int j = 0; j < 5; j++)
+ {
+ Assert.assertEquals(j, bytes[j]);
+ }
+ Assert.assertEquals(i * 1.6, msg.getDoubleProperty("myDoubleProperty"));
+ Assert.assertEquals(i * 2.5F, msg.getFloatProperty("myFloatProperty"));
+ Assert.assertEquals(i, msg.getIntProperty("myIntProperty").intValue());
+ Assert.assertEquals(Long.MAX_VALUE - i, msg.getLongProperty("myLongProperty").longValue());
+ Assert.assertEquals(i, msg.getObjectProperty("myObjectProperty"));
+ Assert.assertEquals(new Integer(i).shortValue(), msg.getShortProperty("myShortProperty").shortValue());
+ Assert.assertEquals("myStringPropertyValue_" + i, msg.getStringProperty("myStringProperty"));
+ Assert.assertEquals(international.toString(), msg.getStringProperty("myNonAsciiStringProperty"));
+ Assert.assertEquals(special, msg.getStringProperty("mySpecialCharacters"));
+ }
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testMessageTypes() throws Exception
+ {
+ final String QUEUE_NAME = "A1";
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+ ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+
+ ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.DEFAULT_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.MAP_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.OBJECT_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.STREAM_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.TEXT_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(true);
+ producer.send(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+ session.start();
+
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.BYTES_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.DEFAULT_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.MAP_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.OBJECT_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.STREAM_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.TEXT_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.DEFAULT_TYPE, msg.getType());
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testMessageAttributes() throws Exception
+ {
+ final String QUEUE_NAME = "A1";
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+ ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+ ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
+ msg.setExpiration(Long.MAX_VALUE);
+ msg.setPriority((byte) 0);
+ msg.setTimestamp(Long.MAX_VALUE - 1);
+ msg.setUserID(UUIDGenerator.getInstance().generateUUID());
+ producer.send(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+ session.start();
+
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Long.MAX_VALUE, msg.getExpiration());
+ Assert.assertEquals((byte) 0, msg.getPriority());
+ Assert.assertEquals(Long.MAX_VALUE - 1, msg.getTimestamp());
+ Assert.assertNotNull(msg.getUserID());
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testBindingAttributes() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue("addressName1", "queueName1");
+ session.createQueue("addressName1", "queueName2", "bob", true);
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+
+ ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
+
+ Assert.assertEquals("addressName1", queueQuery.getAddress().toString());
+ Assert.assertNull(queueQuery.getFilterString());
+
+ queueQuery = session.queueQuery(new SimpleString("queueName2"));
+
+ Assert.assertEquals("addressName1", queueQuery.getAddress().toString());
+ Assert.assertEquals("bob", queueQuery.getFilterString().toString());
+ Assert.assertEquals(Boolean.TRUE.booleanValue(), queueQuery.isDurable());
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testLargeMessage() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, false);
+
+ try
+ {
+ LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
+
+ fileMessage.setMessageID(1005);
+ fileMessage.setDurable(true);
+
+ for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
+ }
+
+ fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+// fileMessage.releaseResources();
+
+ session.createQueue("A", "A");
+
+ ClientProducer prod = session.createProducer("A");
+
+ prod.send(fileMessage);
+
+ fileMessage.deleteFile();
+
+ session.commit();
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createFactory(false);
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ session.close();
+ session = factory.createSession(false, false);
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("A");
+
+ ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
+
+ for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
+ }
+
+ msg.acknowledge();
+ session.commit();
+ }
+ finally
+ {
+ session.close();
+ factory.close();
+ locator.close();
+ server.stop();
+ }
+ }
+
+ public void testPartialQueue() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue("myAddress", "myQueue1");
+ session.createQueue("myAddress", "myQueue2");
+
+ ClientProducer producer = session.createProducer("myAddress");
+
+ ClientMessage msg = session.createMessage(true);
+ producer.send(msg);
+
+ ClientConsumer consumer = session.createConsumer("myQueue1");
+ session.start();
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertNotNull(msg);
+ msg.acknowledge();
+ consumer.close();
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ consumer = session.createConsumer("myQueue1");
+ session.start();
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertNull(msg);
+ consumer.close();
+
+ consumer = session.createConsumer("myQueue2");
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertNotNull(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
12 years, 2 months
JBoss hornetq SVN: r12277 - branches/Branch_2_2_AS7.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-09 12:55:33 -0500 (Fri, 09 Mar 2012)
New Revision: 12277
Modified:
branches/Branch_2_2_AS7/build-hornetq.xml
Log:
build fix for distro
Modified: branches/Branch_2_2_AS7/build-hornetq.xml
===================================================================
--- branches/Branch_2_2_AS7/build-hornetq.xml 2012-03-09 00:02:42 UTC (rev 12276)
+++ branches/Branch_2_2_AS7/build-hornetq.xml 2012-03-09 17:55:33 UTC (rev 12277)
@@ -757,19 +757,19 @@
</schemavalidate>
<!-- AS 4 configuration validation -->
- <antcall target="-validate-configuration">
+ <!-- <antcall target="-validate-configuration">
<param name="conf.dir" value="${src.config.jbossas4.non-clustered.dir}"/>
</antcall>
<antcall target="-validate-configuration">
<param name="conf.dir" value="${src.config.jbossas4.clustered.dir}"/>
- </antcall>
+ </antcall> -->
<!-- AS 5 configuration validation -->
- <antcall target="-validate-configuration">
+ <!-- <antcall target="-validate-configuration">
<param name="conf.dir" value="${src.config.jbossas5.non-clustered.dir}"/>
</antcall>
<antcall target="-validate-configuration">
<param name="conf.dir" value="${src.config.jbossas5.clustered.dir}"/>
- </antcall>
+ </antcall> -->
</target>
<target name="-validate-configuration">
12 years, 2 months
JBoss hornetq SVN: r12276 - branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-08 19:02:42 -0500 (Thu, 08 Mar 2012)
New Revision: 12276
Modified:
branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml
Log:
fixing typo
Modified: branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml
===================================================================
--- branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml 2012-03-08 23:58:05 UTC (rev 12275)
+++ branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/hornetq-rest/pom.xml 2012-03-09 00:02:42 UTC (rev 12276)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.10.EAP.GA-JBPAPP_6277</hornetq.version>
+ <hornetq.version>2.2.10.EAP.GA-JBPAPP_8396</hornetq.version>
</properties>
<licenses>
12 years, 2 months
JBoss hornetq SVN: r12275 - branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-08 18:58:05 -0500 (Thu, 08 Mar 2012)
New Revision: 12275
Modified:
branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties
Log:
fixing typo
Modified: branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties
===================================================================
--- branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties 2012-03-08 23:28:24 UTC (rev 12274)
+++ branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/src/config/common/hornetq-version.properties 2012-03-08 23:58:05 UTC (rev 12275)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_10_EAP_GA_JBPAPP_6277
+hornetq.version.versionName=HQ_2_2_10_EAP_GA_JBPAPP_8396
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=10
12 years, 2 months
JBoss hornetq SVN: r12274 - branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-08 18:28:24 -0500 (Thu, 08 Mar 2012)
New Revision: 12274
Modified:
branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml
Log:
JBPAPP-8396 - fixing typo
Modified: branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml
===================================================================
--- branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml 2012-03-08 23:21:53 UTC (rev 12273)
+++ branches/one-offs/HornetQ_2_2_10_EAP_JBPAPP_8396/build-maven.xml 2012-03-08 23:28:24 UTC (rev 12274)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.10.EAP.GA-JBPAPP_6277"/>
+ <property name="hornetq.version" value="2.2.10.EAP.GA-JBPAPP_8396"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
12 years, 2 months