Author: borges
Date: 2011-10-21 11:34:20 -0400 (Fri, 21 Oct 2011)
New Revision: 11576
Added:
trunk/hornetq-core/src/test/java/org/hornetq/core/message/
trunk/hornetq-core/src/test/java/org/hornetq/core/message/impl/
trunk/hornetq-core/src/test/java/org/hornetq/core/message/impl/MessagePropertyTest.java
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Removed:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java
Log:
Undelete MessagePropertyTest, move ServiceTestBase to hornetq-core/src/tests
Added:
trunk/hornetq-core/src/test/java/org/hornetq/core/message/impl/MessagePropertyTest.java
===================================================================
---
trunk/hornetq-core/src/test/java/org/hornetq/core/message/impl/MessagePropertyTest.java
(rev 0)
+++
trunk/hornetq-core/src/test/java/org/hornetq/core/message/impl/MessagePropertyTest.java 2011-10-21
15:34:20 UTC (rev 11576)
@@ -0,0 +1,110 @@
+package org.hornetq.core.message.impl;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+public class MessagePropertyTest extends ServiceTestBase
+{
+ private HornetQServer server;
+ private ServerLocator locator;
+ private ClientSessionFactory sf;
+ private final int numMessages = 20;
+
+ private static final String ADDRESS = "aAddress123";
+ private static final SimpleString SIMPLE_STRING_KEY = new
SimpleString("StringToSimpleString");
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ server = createServer(true);
+ server.start();
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ sf.close();
+ locator.close();
+ server.stop();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ private void sendMessages() throws Exception
+ {
+ ClientSession session = sf.createSession(true, true);
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ setBody(i, message);
+ message.putIntProperty("int", i);
+ message.putShortProperty("short", (short)i);
+ message.putByteProperty("byte", (byte)i);
+ message.putFloatProperty("float", floatValue(i));
+ message.putStringProperty(SIMPLE_STRING_KEY, new
SimpleString(Integer.toString(i)));
+ message.putBytesProperty("byte[]", byteArray(i));
+ producer.send(message);
+ }
+ session.commit();
+ }
+
+ private float floatValue(int i)
+ {
+ return (float)(i * 1.3);
+ }
+
+ private byte[] byteArray(int i)
+ {
+ return new byte[] { (byte)i, (byte)(i / 2) };
+ }
+
+ public void testProperties() throws Exception
+ {
+ sendMessages();
+ receiveMessages();
+ }
+
+
+ private void receiveMessages() throws Exception
+ {
+ ClientSession session = sf.createSession(true, true);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(100);
+ assertNotNull("Expecting a message " + i, message);
+ assertMessageBody(i, message);
+ assertEquals(i, message.getIntProperty("int").intValue());
+ assertEquals((short)i,
message.getShortProperty("short").shortValue());
+ assertEquals((byte)i, message.getByteProperty("byte").byteValue());
+ assertEquals(floatValue(i),
message.getFloatProperty("float").floatValue(), 0.001);
+ assertEquals(new SimpleString(Integer.toString(i)),
+ message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
+ assertEqualsByteArrays(byteArray(i),
message.getBytesProperty("byte[]"));
+ message.acknowledge();
+ }
+ assertNull("no more messages", consumer.receive(50));
+ consumer.close();
+ session.commit();
+ }
+
+}
Copied: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java (from
rev 11574,
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java)
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
(rev 0)
+++
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-10-21
15:34:20 UTC (rev 11576)
@@ -0,0 +1,810 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.util;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ *
+ * Base class with basic utilities on starting up a basic server
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ *
+ */
+public abstract class ServiceTestBase extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ protected static final long WAIT_TIMEOUT = 10000;
+
+
+ // Attributes ----------------------------------------------------
+
+ protected static final String INVM_ACCEPTOR_FACTORY =
InVMAcceptorFactory.class.getCanonicalName();
+
+ public static final String INVM_CONNECTOR_FACTORY =
InVMConnectorFactory.class.getCanonicalName();
+
+ protected static final String NETTY_ACCEPTOR_FACTORY =
NettyAcceptorFactory.class.getCanonicalName();
+
+ protected static final String NETTY_CONNECTOR_FACTORY =
NettyConnectorFactory.class.getCanonicalName();
+
+ private final List<ServerLocator> locators = new
ArrayList<ServerLocator>();
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ for (ServerLocator locator : locators)
+ {
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ locators.clear();
+ super.tearDown();
+// checkFreePort(5445);
+// checkFreePort(5446);
+// checkFreePort(5447);
+ if (InVMRegistry.instance.size() > 0)
+ {
+ fail("InVMREgistry size > 0");
+ }
+ }
+
+ public static final void closeServerLocator(ServerLocator locator)
+ {
+ if (locator == null)
+ return;
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ protected void waitForTopology(final HornetQServer server, final int nodes) throws
Exception
+ {
+ waitForTopology(server, nodes, WAIT_TIMEOUT);
+ }
+
+ protected void waitForTopology(final HornetQServer server, final int nodes, final long
timeout) throws Exception
+ {
+ log.debug("waiting for " + nodes + " on the topology for server =
" + server);
+
+ long start = System.currentTimeMillis();
+
+ Set<ClusterConnection> ccs =
server.getClusterManager().getClusterConnections();
+
+ if (ccs.size() != 1)
+ {
+ throw new IllegalStateException("You need a single cluster connection on
this version of waitForTopology on ServiceTestBase");
+ }
+
+ Topology topology = ccs.iterator().next().getTopology();
+
+ do
+ {
+ if (nodes == topology.getMembers().size())
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < timeout);
+
+ String msg = "Timed out waiting for cluster topology of " + nodes +
+ " (received " +
+ topology.getMembers().size() +
+ ") topology = " +
+ topology +
+ ")";
+
+ log.error(msg);
+
+ throw new Exception(msg);
+ }
+
+
+ protected void waitForTopology(final HornetQServer server, String
clusterConnectionName, final int nodes, final long timeout) throws Exception
+ {
+ log.debug("waiting for " + nodes + " on the topology for server =
" + server);
+
+ long start = System.currentTimeMillis();
+
+ ClusterConnection clusterConnection =
server.getClusterManager().getClusterConnection(clusterConnectionName);
+
+
+ Topology topology = clusterConnection.getTopology();
+
+ do
+ {
+ if (nodes == topology.getMembers().size())
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < timeout);
+
+ String msg = "Timed out waiting for cluster topology of " + nodes +
+ " (received " +
+ topology.getMembers().size() +
+ ") topology = " +
+ topology +
+ ")";
+
+ log.error(msg);
+
+ throw new Exception(msg);
+ }
+
+ protected final static void waitForComponent(final HornetQComponent component, final
long seconds) throws Exception
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (!component.isStarted())
+ {
+ try
+ {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("component did not start within timeout of " + seconds);
+ }
+ }
+ }
+
+ protected static final void stopComponent(HornetQComponent component)
+ {
+ if (component == null)
+ return;
+ if (component.isStarted())
+ try
+ {
+ component.stop();
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
+ }
+
+ protected static Map<String, Object> generateParams(final int node, final
boolean netty)
+ {
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ if (netty)
+ {
+
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
+ org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT
+ node);
+ }
+ else
+ {
+
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME,
node);
+ }
+
+ return params;
+ }
+
+ protected static TransportConfiguration createTransportConfiguration(boolean netty,
+ boolean
acceptor,
+ Map<String,
Object> params)
+ {
+ String className;
+ if (netty)
+ {
+ if (acceptor)
+ {
+ className = NettyAcceptorFactory.class.getName();
+ }
+ else
+ {
+ className = NettyConnectorFactory.class.getName();
+ }
+ }
+ else
+ {
+ if (acceptor)
+ {
+ className = InVMAcceptorFactory.class.getName();
+ }
+ else
+ {
+ className = InVMConnectorFactory.class.getName();
+ }
+ }
+ return new TransportConfiguration(className, params);
+ }
+
+ // Static --------------------------------------------------------
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Constructors --------------------------------------------------
+
+ public ServiceTestBase()
+ {
+ super();
+ }
+
+ public ServiceTestBase(final String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void waitForServer(HornetQServer server) throws InterruptedException
+ {
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted() && System.currentTimeMillis() < timetowait)
+ {
+ Thread.sleep(100);
+ }
+
+ if (!server.isStarted())
+ {
+ log.info(threadDump("Server didn't start"));
+ fail("server didnt start");
+ }
+
+ if (!server.getConfiguration().isBackup())
+ {
+ timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isInitialised() && System.currentTimeMillis() <
timetowait)
+ {
+ Thread.sleep(100);
+ }
+
+ if (!server.isInitialised())
+ {
+ fail("Server didn't initialize");
+ }
+ }
+ }
+
+
+ protected HornetQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final Map<String, AddressSettings>
settings,
+ final MBeanServer mbeanServer)
+ {
+ HornetQServer server;
+
+ if (realFiles)
+ {
+ server = HornetQServers.newHornetQServer(configuration, mbeanServer, true);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, mbeanServer, false);
+ }
+
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
+ }
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(pageSize);
+ defaultSetting.setMaxSizeBytes(maxAddressSize);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ protected HornetQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final Map<String, AddressSettings>
settings)
+ {
+ return createServer(realFiles, configuration, pageSize, maxAddressSize,
AddressFullMessagePolicy.PAGE, settings);
+ }
+
+ protected HornetQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final AddressFullMessagePolicy fullPolicy,
+ final Map<String, AddressSettings>
settings)
+ {
+ HornetQServer server;
+
+ if (realFiles)
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+
+ if (settings != null)
+ {
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
+ }
+ }
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(pageSize);
+ defaultSetting.setMaxSizeBytes(maxAddressSize);
+ defaultSetting.setAddressFullMessagePolicy(fullPolicy);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ protected HornetQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final MBeanServer mbeanServer,
+ final Map<String, AddressSettings>
settings)
+ {
+ HornetQServer server;
+
+ if (realFiles)
+ {
+ server = HornetQServers.newHornetQServer(configuration, mbeanServer);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, mbeanServer, false);
+ }
+
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
+ }
+
+ AddressSettings defaultSetting = new AddressSettings();
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ protected HornetQServer createServer(final boolean realFiles)
+ {
+ return createServer(realFiles, false);
+ }
+
+ protected HornetQServer createServer(final boolean realFiles, final boolean netty)
+ {
+ return createServer(realFiles, createDefaultConfig(netty), -1, -1, new
HashMap<String, AddressSettings>());
+ }
+
+ protected HornetQServer createServer(final boolean realFiles, final Configuration
configuration)
+ {
+ return createServer(realFiles, configuration, -1, -1, new HashMap<String,
AddressSettings>());
+ }
+
+ protected HornetQServer createInVMFailoverServer(final boolean realFiles,
+ final Configuration configuration,
+ final NodeManager nodeManager,
+ final int id)
+ {
+ return createInVMFailoverServer(realFiles,
+ configuration,
+ -1,
+ -1,
+ new HashMap<String, AddressSettings>(),
+ nodeManager,
+ id);
+ }
+
+ protected HornetQServer createInVMFailoverServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final Map<String,
AddressSettings> settings,
+ NodeManager nodeManager,
+ final int id)
+ {
+ HornetQServer server;
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+ configuration.setPersistenceEnabled(realFiles);
+ server = new InVMNodeManagerServer(configuration,
+ ManagementFactory.getPlatformMBeanServer(),
+ securityManager,
+ nodeManager);
+
+ server.setIdentity("Server " + id);
+
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
+ }
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(pageSize);
+ defaultSetting.setMaxSizeBytes(maxAddressSize);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ protected HornetQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final HornetQSecurityManager securityManager)
+ {
+ HornetQServer server;
+
+ if (realFiles)
+ {
+ server = HornetQServers.newHornetQServer(configuration,
+
ManagementFactory.getPlatformMBeanServer(),
+ securityManager);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration,
+
ManagementFactory.getPlatformMBeanServer(),
+ securityManager,
+ false);
+ }
+
+ Map<String, AddressSettings> settings = new HashMap<String,
AddressSettings>();
+
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
+ }
+
+ AddressSettings defaultSetting = new AddressSettings();
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
+ final int index,
+ final boolean realFiles,
+ final Map<String,
Object> params)
+ {
+ if (isNetty)
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params,
NETTY_ACCEPTOR_FACTORY),
+ -1,
+ -1,
+ new HashMap<String, AddressSettings>());
+ }
+ else
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params,
INVM_ACCEPTOR_FACTORY),
+ -1,
+ -1,
+ new HashMap<String, AddressSettings>());
+ }
+ }
+
+ protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
+ final int index,
+ final boolean realFiles,
+ final int pageSize,
+ final int maxAddressSize,
+ final Map<String,
Object> params)
+ {
+ if (isNetty)
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params,
NETTY_ACCEPTOR_FACTORY),
+ pageSize,
+ maxAddressSize,
+ new HashMap<String, AddressSettings>());
+ }
+ else
+ {
+ return createServer(realFiles,
+ createClusteredDefaultConfig(index, params,
INVM_ACCEPTOR_FACTORY),
+ -1,
+ -1,
+ new HashMap<String, AddressSettings>());
+ }
+ }
+
+ protected ServerLocator createFactory(final boolean isNetty) throws Exception
+ {
+ if (isNetty)
+ {
+ return createNettyNonHALocator();
+ }
+ else
+ {
+ return createInVMNonHALocator();
+ }
+ }
+
+ protected void createQueue(final String address, final String queue) throws Exception
+ {
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession();
+ session.createQueue(address, queue);
+ session.close();
+ sf.close();
+ locator.close();
+ }
+
+ protected ServerLocator createInVMNonHALocator()
+ {
+ return createNonHALocator(false);
+ }
+
+ protected ServerLocator createNettyNonHALocator()
+ {
+ return createNonHALocator(true);
+ }
+
+ protected ServerLocator createNonHALocator(final boolean isNetty)
+ {
+ ServerLocator locatorWithoutHA = isNetty ?
HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(NETTY_CONNECTOR_FACTORY))
+ :
HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ locators.add(locatorWithoutHA);
+ return locatorWithoutHA;
+ }
+
+ protected ServerLocator createInVMLocator(final int serverID)
+ {
+ TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID,
UUIDGenerator.getInstance().generateStringUUID());
+
+ return HornetQClient.createServerLocatorWithHA(tnspConfig);
+ }
+
+ /**
+ * @param serverID
+ * @return
+ */
+ protected TransportConfiguration createInVMTransportConnectorConfig(final int
serverID, String name)
+ {
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+
+ if (serverID != 0)
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+ }
+
+ TransportConfiguration tnspConfig = new
TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params, name);
+ return tnspConfig;
+ }
+
+ // XXX unused
+ protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws
Exception
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(connectorClass));
+ return (ClientSessionFactoryImpl)locator.createSessionFactory();
+
+ }
+ public String getTextMessage(final ClientMessage m)
+ {
+ m.getBodyBuffer().resetReaderIndex();
+ return m.getBodyBuffer().readString();
+ }
+
+ protected ClientMessage createBytesMessage(final ClientSession session,
+ final byte type,
+ final byte[] b,
+ final boolean durable)
+ {
+ ClientMessage message = session.createMessage(type, durable, 0,
System.currentTimeMillis(), (byte)1);
+ message.getBodyBuffer().writeBytes(b);
+ return message;
+ }
+
+ /**
+ * @param i
+ * @param message
+ * @throws Exception
+ */
+ protected void setBody(final int i, final ClientMessage message) throws Exception
+ {
+ message.getBodyBuffer().writeString("message" + i);
+ }
+
+ /**
+ * @param i
+ * @param message
+ */
+ protected void assertMessageBody(final int i, final ClientMessage message)
+ {
+ Assert.assertEquals(message.toString(), "message" + i,
message.getBodyBuffer().readString());
+ }
+
+ /**
+ * Send durable messages with pre-specified body.
+ * @param session
+ * @param producer
+ * @param numMessages
+ * @throws Exception
+ */
+ public final void sendMessages(ClientSession session, ClientProducer producer, int
numMessages) throws Exception
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ setBody(i, message);
+ message.putIntProperty("counter", i);
+ producer.send(message);
+ }
+ }
+
+
+ protected final void receiveMessagesAndAck(ClientConsumer consumer, final int start,
int msgCount)
+ throws HornetQException
+ {
+ for (int i = start; i < msgCount; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull("Expecting a message " + i, message);
+ assertMessageBody(i, message);
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ message.acknowledge();
+ }
+ }
+
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. We need to keep looking
for a while
+ * if the file hasn't been deleted yet.
+ */
+ protected void validateNoFilesOnLargeDir(final int expect) throws Exception
+ {
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+
+ // Deleting the file is async... we keep looking for a period of the time until the
file is really gone
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() &&
largeMessagesFileDir.listFiles().length != expect)
+ {
+ Thread.sleep(100);
+ }
+
+
+ if (expect != largeMessagesFileDir.listFiles().length)
+ {
+ for (File file : largeMessagesFileDir.listFiles())
+ {
+ System.out.println("File " + file + " still on ");
+ }
+ }
+
+ Assert.assertEquals(expect, largeMessagesFileDir.listFiles().length);
+ }
+
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking
for a while
+ * if the file hasn't been deleted yet
+ */
+ protected void validateNoFilesOnLargeDir() throws Exception
+ {
+ validateNoFilesOnLargeDir(0);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ class InVMNodeManagerServer extends HornetQServerImpl
+ {
+ final NodeManager nodeManager;
+
+ public InVMNodeManagerServer(final NodeManager nodeManager)
+ {
+ super();
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(final Configuration configuration, final NodeManager
nodeManager)
+ {
+ super(configuration);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(final Configuration configuration,
+ final MBeanServer mbeanServer,
+ final NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(final Configuration configuration,
+ final HornetQSecurityManager securityManager,
+ final NodeManager nodeManager)
+ {
+ super(configuration, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(final Configuration configuration,
+ final MBeanServer mbeanServer,
+ final HornetQSecurityManager securityManager,
+ final NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ @Override
+ protected NodeManager createNodeManager(final String directory)
+ {
+ return nodeManager;
+ }
+
+ }
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-21
14:24:50 UTC (rev 11575)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-21
15:34:20 UTC (rev 11576)
@@ -568,7 +568,7 @@
if (holder != null)
{
- holder.consumer.close();
+ holder.close();
// holder.session.close();
consumers[i] = null;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java 2011-10-21
14:24:50 UTC (rev 11575)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java 2011-10-21
15:34:20 UTC (rev 11576)
@@ -19,15 +19,20 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.tests.util.ServiceTestBase;
/**
* A DeleteMessagesRestartTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
+ *
* Created Mar 2, 2009 10:14:38 AM
*
*
@@ -81,7 +86,7 @@
for (int i = 0; i < 100; i++)
{
- ClientMessage msg = createBytesMessage(session, new byte[0], true);
+ ClientMessage msg = createBytesMessage(session, HornetQBytesMessage.TYPE, new
byte[0], true);
prod.send(msg);
}
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java
===================================================================
---
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java 2011-10-21
14:24:50 UTC (rev 11575)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java 2011-10-21
15:34:20 UTC (rev 11576)
@@ -28,11 +28,12 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.tests.util.ServiceTestBase;
/**
* This is an integration-tests that will take some time to run.
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*/
public class PageStressTest extends ServiceTestBase
@@ -84,7 +85,7 @@
ClientProducer prod = session.createProducer(address);
- ClientMessage message = createBytesMessage(session, new byte[700], true);
+ ClientMessage message = createBytesMessage(session, HornetQBytesMessage.TYPE,
new byte[700], true);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
{
@@ -199,7 +200,7 @@
ClientProducer prod = session.createProducer(address);
- ClientMessage message = createBytesMessage(session, new byte[700], false);
+ ClientMessage message = createBytesMessage(session, HornetQBytesMessage.TYPE,
new byte[700], false);
int NUMBER_OF_MESSAGES = 60000;
@@ -299,7 +300,7 @@
clearData();
locator = createInVMNonHALocator();
-
+
locator.setBlockOnAcknowledge(true);
locator.setBlockOnDurableSend(false);
locator.setBlockOnNonDurableSend(false);
Deleted: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-10-21
14:24:50 UTC (rev 11575)
+++
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-10-21
15:34:20 UTC (rev 11576)
@@ -1,812 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.util;
-
-import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.management.MBeanServer;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.jms.client.HornetQBytesMessage;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
-import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- *
- * Base class with basic utilities on starting up a basic server
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
- */
-public abstract class ServiceTestBase extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- protected static final long WAIT_TIMEOUT = 10000;
-
-
- // Attributes ----------------------------------------------------
-
- protected static final String INVM_ACCEPTOR_FACTORY =
InVMAcceptorFactory.class.getCanonicalName();
-
- public static final String INVM_CONNECTOR_FACTORY =
InVMConnectorFactory.class.getCanonicalName();
-
- protected static final String NETTY_ACCEPTOR_FACTORY =
NettyAcceptorFactory.class.getCanonicalName();
-
- protected static final String NETTY_CONNECTOR_FACTORY =
NettyConnectorFactory.class.getCanonicalName();
-
- private final List<ServerLocator> locators = new
ArrayList<ServerLocator>();
-
- @Override
- protected void tearDown() throws Exception
- {
- for (ServerLocator locator : locators)
- {
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- locators.clear();
- super.tearDown();
-// checkFreePort(5445);
-// checkFreePort(5446);
-// checkFreePort(5447);
- if (InVMRegistry.instance.size() > 0)
- {
- fail("InVMREgistry size > 0");
- }
- }
-
- public static final void closeServerLocator(ServerLocator locator)
- {
- if (locator == null)
- return;
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- protected void waitForTopology(final HornetQServer server, final int nodes) throws
Exception
- {
- waitForTopology(server, nodes, WAIT_TIMEOUT);
- }
-
- protected void waitForTopology(final HornetQServer server, final int nodes, final long
timeout) throws Exception
- {
- log.debug("waiting for " + nodes + " on the topology for server =
" + server);
-
- long start = System.currentTimeMillis();
-
- Set<ClusterConnection> ccs =
server.getClusterManager().getClusterConnections();
-
- if (ccs.size() != 1)
- {
- throw new IllegalStateException("You need a single cluster connection on
this version of waitForTopology on ServiceTestBase");
- }
-
- Topology topology = ccs.iterator().next().getTopology();
-
- do
- {
- if (nodes == topology.getMembers().size())
- {
- return;
- }
-
- Thread.sleep(10);
- }
- while (System.currentTimeMillis() - start < timeout);
-
- String msg = "Timed out waiting for cluster topology of " + nodes +
- " (received " +
- topology.getMembers().size() +
- ") topology = " +
- topology +
- ")";
-
- log.error(msg);
-
- throw new Exception(msg);
- }
-
-
- protected void waitForTopology(final HornetQServer server, String
clusterConnectionName, final int nodes, final long timeout) throws Exception
- {
- log.debug("waiting for " + nodes + " on the topology for server =
" + server);
-
- long start = System.currentTimeMillis();
-
- ClusterConnection clusterConnection =
server.getClusterManager().getClusterConnection(clusterConnectionName);
-
-
- Topology topology = clusterConnection.getTopology();
-
- do
- {
- if (nodes == topology.getMembers().size())
- {
- return;
- }
-
- Thread.sleep(10);
- }
- while (System.currentTimeMillis() - start < timeout);
-
- String msg = "Timed out waiting for cluster topology of " + nodes +
- " (received " +
- topology.getMembers().size() +
- ") topology = " +
- topology +
- ")";
-
- log.error(msg);
-
- throw new Exception(msg);
- }
-
- protected final static void waitForComponent(final HornetQComponent component, final
long seconds) throws Exception
- {
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (!component.isStarted())
- {
- try
- {
- Thread.sleep(50);
- }
- catch (InterruptedException e)
- {
- // ignore
- }
- if (System.currentTimeMillis() > (time + toWait))
- {
- fail("component did not start within timeout of " + seconds);
- }
- }
- }
-
- protected static final void stopComponent(HornetQComponent component)
- {
- if (component == null)
- return;
- if (component.isStarted())
- try
- {
- component.stop();
- }
- catch (Exception e)
- {
- // no-op
- }
- }
-
- protected static Map<String, Object> generateParams(final int node, final
boolean netty)
- {
- Map<String, Object> params = new HashMap<String, Object>();
-
- if (netty)
- {
-
params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT
+ node);
- }
- else
- {
-
params.put(org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME,
node);
- }
-
- return params;
- }
-
- protected static TransportConfiguration createTransportConfiguration(boolean netty,
- boolean
acceptor,
- Map<String,
Object> params)
- {
- String className;
- if (netty)
- {
- if (acceptor)
- {
- className = NettyAcceptorFactory.class.getName();
- }
- else
- {
- className = NettyConnectorFactory.class.getName();
- }
- }
- else
- {
- if (acceptor)
- {
- className = InVMAcceptorFactory.class.getName();
- }
- else
- {
- className = InVMConnectorFactory.class.getName();
- }
- }
- return new TransportConfiguration(className, params);
- }
-
- // Static --------------------------------------------------------
- private final Logger log = Logger.getLogger(this.getClass());
-
- // Constructors --------------------------------------------------
-
- public ServiceTestBase()
- {
- super();
- }
-
- public ServiceTestBase(final String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void waitForServer(HornetQServer server) throws InterruptedException
- {
- long timetowait = System.currentTimeMillis() + 5000;
- while (!server.isStarted() && System.currentTimeMillis() < timetowait)
- {
- Thread.sleep(100);
- }
-
- if (!server.isStarted())
- {
- log.info(threadDump("Server didn't start"));
- fail("server didnt start");
- }
-
- if (!server.getConfiguration().isBackup())
- {
- timetowait = System.currentTimeMillis() + 5000;
- while (!server.isInitialised() && System.currentTimeMillis() <
timetowait)
- {
- Thread.sleep(100);
- }
-
- if (!server.isInitialised())
- {
- fail("Server didn't initialize");
- }
- }
- }
-
-
- protected HornetQServer createServer(final boolean realFiles,
- final Configuration configuration,
- final int pageSize,
- final int maxAddressSize,
- final Map<String, AddressSettings>
settings,
- final MBeanServer mbeanServer)
- {
- HornetQServer server;
-
- if (realFiles)
- {
- server = HornetQServers.newHornetQServer(configuration, mbeanServer, true);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, mbeanServer, false);
- }
-
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
- {
- server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
- }
-
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setPageSizeBytes(pageSize);
- defaultSetting.setMaxSizeBytes(maxAddressSize);
-
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- return server;
- }
-
- protected HornetQServer createServer(final boolean realFiles,
- final Configuration configuration,
- final int pageSize,
- final int maxAddressSize,
- final Map<String, AddressSettings>
settings)
- {
- return createServer(realFiles, configuration, pageSize, maxAddressSize,
AddressFullMessagePolicy.PAGE, settings);
- }
-
- protected HornetQServer createServer(final boolean realFiles,
- final Configuration configuration,
- final int pageSize,
- final int maxAddressSize,
- final AddressFullMessagePolicy fullPolicy,
- final Map<String, AddressSettings>
settings)
- {
- HornetQServer server;
-
- if (realFiles)
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
-
- if (settings != null)
- {
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
- {
- server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
- }
- }
-
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setPageSizeBytes(pageSize);
- defaultSetting.setMaxSizeBytes(maxAddressSize);
- defaultSetting.setAddressFullMessagePolicy(fullPolicy);
-
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- return server;
- }
-
- protected HornetQServer createServer(final boolean realFiles,
- final Configuration configuration,
- final MBeanServer mbeanServer,
- final Map<String, AddressSettings>
settings)
- {
- HornetQServer server;
-
- if (realFiles)
- {
- server = HornetQServers.newHornetQServer(configuration, mbeanServer);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, mbeanServer, false);
- }
-
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
- {
- server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
- }
-
- AddressSettings defaultSetting = new AddressSettings();
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- return server;
- }
-
- protected HornetQServer createServer(final boolean realFiles)
- {
- return createServer(realFiles, false);
- }
-
- protected HornetQServer createServer(final boolean realFiles, final boolean netty)
- {
- return createServer(realFiles, createDefaultConfig(netty), -1, -1, new
HashMap<String, AddressSettings>());
- }
-
- protected HornetQServer createServer(final boolean realFiles, final Configuration
configuration)
- {
- return createServer(realFiles, configuration, -1, -1, new HashMap<String,
AddressSettings>());
- }
-
- protected HornetQServer createInVMFailoverServer(final boolean realFiles,
- final Configuration configuration,
- final NodeManager nodeManager,
- final int id)
- {
- return createInVMFailoverServer(realFiles,
- configuration,
- -1,
- -1,
- new HashMap<String, AddressSettings>(),
- nodeManager,
- id);
- }
-
- protected HornetQServer createInVMFailoverServer(final boolean realFiles,
- final Configuration configuration,
- final int pageSize,
- final int maxAddressSize,
- final Map<String,
AddressSettings> settings,
- NodeManager nodeManager,
- final int id)
- {
- HornetQServer server;
- HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
- configuration.setPersistenceEnabled(realFiles);
- server = new InVMNodeManagerServer(configuration,
- ManagementFactory.getPlatformMBeanServer(),
- securityManager,
- nodeManager);
-
- server.setIdentity("Server " + id);
-
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
- {
- server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
- }
-
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setPageSizeBytes(pageSize);
- defaultSetting.setMaxSizeBytes(maxAddressSize);
-
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- return server;
- }
-
- protected HornetQServer createServer(final boolean realFiles,
- final Configuration configuration,
- final HornetQSecurityManager securityManager)
- {
- HornetQServer server;
-
- if (realFiles)
- {
- server = HornetQServers.newHornetQServer(configuration,
-
ManagementFactory.getPlatformMBeanServer(),
- securityManager);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration,
-
ManagementFactory.getPlatformMBeanServer(),
- securityManager,
- false);
- }
-
- Map<String, AddressSettings> settings = new HashMap<String,
AddressSettings>();
-
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
- {
- server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
- }
-
- AddressSettings defaultSetting = new AddressSettings();
-
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- return server;
- }
-
- protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
- final int index,
- final boolean realFiles,
- final Map<String,
Object> params)
- {
- if (isNetty)
- {
- return createServer(realFiles,
- createClusteredDefaultConfig(index, params,
NETTY_ACCEPTOR_FACTORY),
- -1,
- -1,
- new HashMap<String, AddressSettings>());
- }
- else
- {
- return createServer(realFiles,
- createClusteredDefaultConfig(index, params,
INVM_ACCEPTOR_FACTORY),
- -1,
- -1,
- new HashMap<String, AddressSettings>());
- }
- }
-
- protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
- final int index,
- final boolean realFiles,
- final int pageSize,
- final int maxAddressSize,
- final Map<String,
Object> params)
- {
- if (isNetty)
- {
- return createServer(realFiles,
- createClusteredDefaultConfig(index, params,
NETTY_ACCEPTOR_FACTORY),
- pageSize,
- maxAddressSize,
- new HashMap<String, AddressSettings>());
- }
- else
- {
- return createServer(realFiles,
- createClusteredDefaultConfig(index, params,
INVM_ACCEPTOR_FACTORY),
- -1,
- -1,
- new HashMap<String, AddressSettings>());
- }
- }
-
- protected ServerLocator createFactory(final boolean isNetty) throws Exception
- {
- if (isNetty)
- {
- return createNettyNonHALocator();
- }
- else
- {
- return createInVMNonHALocator();
- }
- }
-
- protected void createQueue(final String address, final String queue) throws Exception
- {
- ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory sf = locator.createSessionFactory();
- ClientSession session = sf.createSession();
- session.createQueue(address, queue);
- session.close();
- sf.close();
- locator.close();
- }
-
- protected ServerLocator createInVMNonHALocator()
- {
- return createNonHALocator(false);
- }
-
- protected ServerLocator createNettyNonHALocator()
- {
- return createNonHALocator(true);
- }
-
- protected ServerLocator createNonHALocator(final boolean isNetty)
- {
- ServerLocator locatorWithoutHA = isNetty ?
HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(NETTY_CONNECTOR_FACTORY))
- :
HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
- locators.add(locatorWithoutHA);
- return locatorWithoutHA;
- }
-
- protected ServerLocator createInVMLocator(final int serverID)
- {
- TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID,
UUIDGenerator.getInstance().generateStringUUID());
-
- return HornetQClient.createServerLocatorWithHA(tnspConfig);
- }
-
- /**
- * @param serverID
- * @return
- */
- protected TransportConfiguration createInVMTransportConnectorConfig(final int
serverID, String name)
- {
- Map<String, Object> server1Params = new HashMap<String, Object>();
-
- if (serverID != 0)
- {
- server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
- }
-
- TransportConfiguration tnspConfig = new
TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params, name);
- return tnspConfig;
- }
-
- // XXX unused
- protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws
Exception
- {
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(connectorClass));
- return (ClientSessionFactoryImpl)locator.createSessionFactory();
-
- }
- public String getTextMessage(final ClientMessage m)
- {
- m.getBodyBuffer().resetReaderIndex();
- return m.getBodyBuffer().readString();
- }
-
- protected ClientMessage createBytesMessage(final ClientSession session, final byte[]
b, final boolean durable)
- {
- ClientMessage message = session.createMessage(HornetQBytesMessage.TYPE,
- durable,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.getBodyBuffer().writeBytes(b);
- return message;
- }
-
- /**
- * @param i
- * @param message
- * @throws Exception
- */
- protected void setBody(final int i, final ClientMessage message) throws Exception
- {
- message.getBodyBuffer().writeString("message" + i);
- }
-
- /**
- * @param i
- * @param message
- */
- protected void assertMessageBody(final int i, final ClientMessage message)
- {
- Assert.assertEquals(message.toString(), "message" + i,
message.getBodyBuffer().readString());
- }
-
- /**
- * Send durable messages with pre-specified body.
- * @param session
- * @param producer
- * @param numMessages
- * @throws Exception
- */
- public final void sendMessages(ClientSession session, ClientProducer producer, int
numMessages) throws Exception
- {
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(true);
- setBody(i, message);
- message.putIntProperty("counter", i);
- producer.send(message);
- }
- }
-
-
- protected final void receiveMessagesAndAck(ClientConsumer consumer, final int start,
int msgCount)
- throws HornetQException
- {
- for (int i = start; i < msgCount; i++)
- {
- ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull("Expecting a message " + i, message);
- assertMessageBody(i, message);
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- message.acknowledge();
- }
- }
-
- /**
- * Deleting a file on LargeDire is an asynchronous process. We need to keep looking
for a while
- * if the file hasn't been deleted yet.
- */
- protected void validateNoFilesOnLargeDir(final int expect) throws Exception
- {
- File largeMessagesFileDir = new File(getLargeMessagesDir());
-
- // Deleting the file is async... we keep looking for a period of the time until the
file is really gone
- long timeout = System.currentTimeMillis() + 5000;
- while (timeout > System.currentTimeMillis() &&
largeMessagesFileDir.listFiles().length != expect)
- {
- Thread.sleep(100);
- }
-
-
- if (expect != largeMessagesFileDir.listFiles().length)
- {
- for (File file : largeMessagesFileDir.listFiles())
- {
- System.out.println("File " + file + " still on ");
- }
- }
-
- Assert.assertEquals(expect, largeMessagesFileDir.listFiles().length);
- }
-
- /**
- * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking
for a while
- * if the file hasn't been deleted yet
- */
- protected void validateNoFilesOnLargeDir() throws Exception
- {
- validateNoFilesOnLargeDir(0);
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
- class InVMNodeManagerServer extends HornetQServerImpl
- {
- final NodeManager nodeManager;
-
- public InVMNodeManagerServer(final NodeManager nodeManager)
- {
- super();
- this.nodeManager = nodeManager;
- }
-
- public InVMNodeManagerServer(final Configuration configuration, final NodeManager
nodeManager)
- {
- super(configuration);
- this.nodeManager = nodeManager;
- }
-
- public InVMNodeManagerServer(final Configuration configuration,
- final MBeanServer mbeanServer,
- final NodeManager nodeManager)
- {
- super(configuration, mbeanServer);
- this.nodeManager = nodeManager;
- }
-
- public InVMNodeManagerServer(final Configuration configuration,
- final HornetQSecurityManager securityManager,
- final NodeManager nodeManager)
- {
- super(configuration, securityManager);
- this.nodeManager = nodeManager;
- }
-
- public InVMNodeManagerServer(final Configuration configuration,
- final MBeanServer mbeanServer,
- final HornetQSecurityManager securityManager,
- final NodeManager nodeManager)
- {
- super(configuration, mbeanServer, securityManager);
- this.nodeManager = nodeManager;
- }
-
- @Override
- protected NodeManager createNodeManager(final String directory)
- {
- return nodeManager;
- }
-
- }
-}