JBoss hornetq SVN: r9689 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-15 09:00:32 -0400 (Wed, 15 Sep 2010)
New Revision: 9689
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
Log:
added test for failback to primary live server
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-15 00:39:30 UTC (rev 9688)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-15 13:00:32 UTC (rev 9689)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -70,7 +71,7 @@
protected abstract boolean isNetty();
- protected int waitForBackup(long seconds, List<TestableServer> servers, int... nodes)
+ protected int waitForBackup(long seconds, Map<Integer, TestableServer> servers, int... nodes)
{
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-15 00:39:30 UTC (rev 9688)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-15 13:00:32 UTC (rev 9689)
@@ -14,7 +14,9 @@
package org.hornetq.tests.integration.cluster.failover;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
@@ -30,7 +32,7 @@
*/
public class MultipleLivesMultipleBackupsFailoverTest extends MultipleBackupsFailoverTestBase
{
- protected ArrayList<TestableServer> servers = new ArrayList<TestableServer>(5);
+ protected Map<Integer, TestableServer> servers = new HashMap<Integer, TestableServer>();
public void testMultipleFailovers2LiveServers() throws Exception
{
@@ -124,7 +126,7 @@
config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode);
- servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
+ servers.put(nodeid, new SameProcessHornetQServer(createFakeLockServer(true, config1)));
}
protected void createLiveConfig(int liveNode, int ... otherLiveNodes)
@@ -154,7 +156,7 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
+ servers.put(liveNode, new SameProcessHornetQServer(createFakeLockServer(true, config0)));
}
protected boolean isNetty()
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2010-09-15 00:39:30 UTC (rev 9688)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2010-09-15 13:00:32 UTC (rev 9689)
@@ -76,13 +76,13 @@
@Override
protected void createLiveConfig(int liveNode, int... otherLiveNodes)
{
- servers.add(new RemoteProcessHornetQServer(lives.get(liveNode)));
+ servers.put(liveNode, new RemoteProcessHornetQServer(lives.get(liveNode)));
}
@Override
protected void createBackupConfig(int liveNode, int nodeid, boolean createClusterConnections, int... nodes)
{
- servers.add(new RemoteProcessHornetQServer(backups.get(nodeid)));
+ servers.put(nodeid, new RemoteProcessHornetQServer(backups.get(nodeid)));
}
// Private -------------------------------------------------------
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2010-09-15 00:39:30 UTC (rev 9688)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2010-09-15 13:00:32 UTC (rev 9689)
@@ -19,6 +19,9 @@
import java.util.Map;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
@@ -42,6 +45,62 @@
// Public --------------------------------------------------------
+ /**
+ * Checks that if the live server is restarted, it will became live again after killin the current activated server.
+ */
+ public void testMultipleFailoversAndRestartLiveServer() throws Exception
+ {
+ createLiveConfig(0);
+ createBackupConfig(0, 1, false, 0, 2, 3);
+ createBackupConfig(0, 2, false, 0, 1, 3);
+ createBackupConfig(0, 3, false, 0, 1, 2);
+ servers.get(0).start();
+ servers.get(1).start();
+ servers.get(2).start();
+ servers.get(3).start();
+
+ ServerLocator locator = getServerLocator(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ int backupNode;
+ ClientSession session = sendAndConsume(sf, true);
+ System.out.println("failing live node ");
+ servers.get(0).crash(session);
+
+ session.close();
+ backupNode = waitForBackup(5, servers, 1, 2, 3);
+ session = sendAndConsume(sf, false);
+
+ System.out.println("restarting live node as a backup");
+ createBackupConfig(0, 0, false, 1, 2, 3);
+ servers.get(0).start();
+
+ System.out.println("stopping waiting nodes");
+ for (int i = 1; i <= 3; i++)
+ {
+ if (i != backupNode)
+ {
+ System.out.println("stopping node " + i);
+ servers.get(i).stop();
+ }
+ }
+
+ System.out.println("failing node " + backupNode);
+ servers.get(backupNode).crash(session);
+ session.close();
+ backupNode = waitForBackup(5, servers, 0);
+ assertEquals(0, backupNode);
+ session = sendAndConsume(sf, false);
+
+ locator.close();
+
+ servers.get(0).stop();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -51,6 +110,7 @@
{
super.setUp();
+ backups.put(0, SharedBackupServerConfiguration0.class.getName());
backups.put(1, SharedBackupServerConfiguration1.class.getName());
backups.put(2, SharedBackupServerConfiguration2.class.getName());
backups.put(3, SharedBackupServerConfiguration3.class.getName());
@@ -66,13 +126,13 @@
@Override
protected void createLiveConfig(int liveNode, int... otherLiveNodes)
{
- servers.add(new RemoteProcessHornetQServer(SharedLiveServerConfiguration.class.getName()));
+ servers.put(liveNode, new RemoteProcessHornetQServer(SharedLiveServerConfiguration.class.getName()));
}
@Override
protected void createBackupConfig(int liveNode, int nodeid, boolean createClusterConnections, int... nodes)
{
- servers.add(new RemoteProcessHornetQServer(backups.get(nodeid)));
+ servers.put(nodeid, new RemoteProcessHornetQServer(backups.get(nodeid)));
}
// Private -------------------------------------------------------
@@ -117,6 +177,15 @@
}
}
+ public static class SharedBackupServerConfiguration0 extends RemoteServerConfiguration
+ {
+ @Override
+ public Configuration getConfiguration()
+ {
+ return createBackupConfiguration(0, 0, false, 1, 2, 3, 4, 5);
+ }
+ }
+
public static class SharedBackupServerConfiguration1 extends RemoteServerConfiguration
{
@Override
@@ -188,7 +257,7 @@
config1.setBackupConnectorConfiguration(connectorConfiguration);
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
-
+System.out.println(config1.getBindingsDirectory());
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode);
config1.setJournalDirectory(config1.getJournalDirectory() + "_" + liveNode);
config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-09-15 00:39:30 UTC (rev 9688)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-09-15 13:00:32 UTC (rev 9689)
@@ -13,8 +13,14 @@
package org.hornetq.tests.integration.cluster.failover;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
@@ -31,7 +37,7 @@
public class SingleLiveMultipleBackupsFailoverTest extends MultipleBackupsFailoverTestBase
{
- protected ArrayList<TestableServer> servers = new ArrayList<TestableServer>(5);
+ protected Map<Integer, TestableServer> servers = new HashMap<Integer, TestableServer>();
public void testMultipleFailovers() throws Exception
{
@@ -90,7 +96,7 @@
session.close();
servers.get(backupNode).stop();
}
-
+
protected void createBackupConfig(int liveNode, int nodeid, boolean createClusterConnections, int... nodes)
{
Configuration config1 = super.createDefaultConfig();
@@ -117,13 +123,12 @@
config1.setBackupConnectorConfiguration(connectorConfiguration);
config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
-
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode);
config1.setJournalDirectory(config1.getJournalDirectory() + "_" + liveNode);
config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode);
- servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
+ servers.put(nodeid, new SameProcessHornetQServer(createFakeLockServer(true, config1)));
}
protected void createLiveConfig(int liveNode, int ... otherLiveNodes)
@@ -153,7 +158,7 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
+ servers.put(liveNode, new SameProcessHornetQServer(createFakeLockServer(true, config0)));
}
protected boolean isNetty()
13 years, 9 months
JBoss hornetq SVN: r9688 - trunk/tests/src/org/hornetq/tests/soak/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-14 20:39:30 -0400 (Tue, 14 Sep 2010)
New Revision: 9688
Added:
trunk/tests/src/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
Log:
Adding new test (without diverts)
Added: trunk/tests/src/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java 2010-09-15 00:39:30 UTC (rev 9688)
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ClientSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ClientNonDivertedSoakTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("ADD");
+
+ private static final boolean IS_NETTY = false;
+
+ private static final boolean IS_JOURNAL = false;
+
+ public static final int MIN_MESSAGES_ON_QUEUE = 5000;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig(ClientNonDivertedSoakTest.IS_NETTY);
+
+ config.setJournalFileSize(10 * 1024 * 1024);
+
+ server = createServer(IS_JOURNAL, config, -1, -1, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(ClientNonDivertedSoakTest.IS_NETTY);
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(ClientNonDivertedSoakTest.ADDRESS, ClientNonDivertedSoakTest.ADDRESS, true);
+
+ session.close();
+
+ sf.close();
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ server = null;
+ }
+
+ public void testSoakClient() throws Exception
+ {
+ final ClientSessionFactory sf = createFactory(IS_NETTY);
+
+ ClientSession session = sf.createSession(false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putLongProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i + " messages");
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+
+ Receiver rec1 = new Receiver(createFactory(IS_NETTY), ADDRESS.toString());
+
+ Sender send = new Sender(createFactory(IS_NETTY), ADDRESS.toString(), new Receiver[] { rec1 });
+
+ send.start();
+ rec1.start();
+
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+ while (timeEnd > System.currentTimeMillis())
+ {
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 )
+ {
+ System.out.println("There are sequence errors in some of the clients, please look at the logs");
+ break;
+ }
+
+ System.out.println("count = " + send.msgs);
+ Thread.sleep(10000);
+ }
+
+ send.setRunning(false);
+ rec1.setRunning(false);
+
+ send.join();
+ rec1.join();
+
+ assertEquals(0, send.getErrorsCount());
+ assertEquals(0, rec1.getErrorsCount());
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
13 years, 9 months
JBoss hornetq SVN: r9687 - in branches/Branch_2_1: tests/src/org/hornetq/tests/soak and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-14 19:39:40 -0400 (Tue, 14 Sep 2010)
New Revision: 9687
Added:
branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/
branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java
branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Receiver.java
branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Sender.java
Modified:
branches/Branch_2_1/merge-activity.txt
Log:
Merging test from trunk -r9685:9686
Modified: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt 2010-09-14 23:36:32 UTC (rev 9686)
+++ branches/Branch_2_1/merge-activity.txt 2010-09-14 23:39:40 UTC (rev 9687)
@@ -16,5 +16,7 @@
trunk is aligned with all the changes from Branch_2_1 up to r9668
- 14-sep-2010 - jmesnil - merger from trunk -r 9682:9683 - https://jira.jboss.org/browse/HORNETQ-509
+
+- 14-sep-2010 - clebert - merger from trunk -r 9685:9686
TODO: Bring changes from HORNETQ-469 as soon as it's stable (remove this line as soon as it's done)
Added: branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java (rev 0)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java 2010-09-14 23:39:40 UTC (rev 9687)
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.logging.Logger;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class ClientAbstract extends Thread
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ClientAbstract.class.getName());
+
+ // Attributes ----------------------------------------------------
+
+ protected ClientSession session;
+
+ protected final ClientSessionFactory sf;
+
+ protected Xid activeXid;
+
+ protected volatile boolean running = true;
+
+ protected volatile int errors = 0;
+
+ /**
+ * A commit was called
+ * case we don't find the Xid, means it was accepted
+ */
+ protected volatile boolean pendingCommit = false;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClientAbstract(ClientSessionFactory sf)
+ {
+ this.sf = sf;
+ }
+
+ // Public --------------------------------------------------------
+
+ public ClientSession getConnection()
+ {
+ return session;
+ }
+
+ public int getErrorsCount()
+ {
+ return errors;
+ }
+
+ public final void connect()
+ {
+ while (running)
+ {
+ try
+ {
+ disconnect();
+
+ session = sf.createXASession();
+
+ if (activeXid != null)
+ {
+ synchronized (ClientAbstract.class)
+ {
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ boolean found = false;
+ for (Xid recXid : xids)
+ {
+ if (recXid.equals(activeXid))
+ {
+ // System.out.println("Calling commit after a prepare on " + this);
+ found = true;
+ callCommit();
+ }
+ }
+
+ if (!found)
+ {
+ if (pendingCommit)
+ {
+ onCommit();
+ }
+ else
+ {
+ onRollback();
+ }
+
+ activeXid = null;
+ pendingCommit = false;
+ }
+ }
+ }
+
+ connectClients();
+
+ break;
+ }
+ catch (Exception e)
+ {
+ ClientAbstract.log.warning("Can't connect to server, retrying");
+ disconnect();
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignored)
+ {
+ // if an interruption was sent, we will respect it and leave the loop
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ connect();
+ }
+
+ protected void callCommit() throws Exception
+ {
+ pendingCommit = true;
+ session.commit(activeXid, false);
+ pendingCommit = false;
+ activeXid = null;
+ onCommit();
+ }
+
+ protected void callPrepare() throws Exception
+ {
+ session.prepare(activeXid);
+ }
+
+ public void beginTX() throws Exception
+ {
+ activeXid = newXID();
+
+ session.start(activeXid, XAResource.TMNOFLAGS);
+ }
+
+ public void endTX() throws Exception
+ {
+ session.end(activeXid, XAResource.TMSUCCESS);
+ callPrepare();
+ callCommit();
+ }
+
+ public void setRunning(final boolean running)
+ {
+ this.running = running;
+ }
+
+ /**
+ * @return
+ */
+ private XidImpl newXID()
+ {
+ return new XidImpl("tst".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
+ protected abstract void connectClients() throws Exception;
+
+ protected abstract void onCommit();
+
+ protected abstract void onRollback();
+
+ public void disconnect()
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ session = null;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java (rev 0)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java 2010-09-14 23:39:40 UTC (rev 9687)
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ClientSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ClientSoakTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("ADD");
+
+ private static final SimpleString DIVERTED_AD1 = ClientSoakTest.ADDRESS.concat("-1");
+
+ private static final SimpleString DIVERTED_AD2 = ClientSoakTest.ADDRESS.concat("-2");
+
+ private static final boolean IS_NETTY = true;
+
+ private static final boolean IS_JOURNAL = true;
+
+ public static final int MIN_MESSAGES_ON_QUEUE = 5000;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig(ClientSoakTest.IS_NETTY);
+
+ config.setJournalFileSize(10 * 1024 * 1024);
+
+ server = createServer(IS_JOURNAL, config, -1, -1, new HashMap<String, AddressSettings>());
+
+ DivertConfiguration divert1 = new DivertConfiguration("dv1",
+ "nm1",
+ ClientSoakTest.ADDRESS.toString(),
+ ClientSoakTest.DIVERTED_AD1.toString(),
+ true,
+ null,
+ null);
+
+ DivertConfiguration divert2 = new DivertConfiguration("dv2",
+ "nm2",
+ ClientSoakTest.ADDRESS.toString(),
+ ClientSoakTest.DIVERTED_AD2.toString(),
+ true,
+ null,
+ null);
+
+ ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+ divertList.add(divert1);
+ divertList.add(divert2);
+
+ config.setDivertConfigurations(divertList);
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(ClientSoakTest.IS_NETTY);
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(ClientSoakTest.ADDRESS, ClientSoakTest.ADDRESS, true);
+
+ session.createQueue(ClientSoakTest.DIVERTED_AD1, ClientSoakTest.DIVERTED_AD1, true);
+
+ session.createQueue(ClientSoakTest.DIVERTED_AD2, ClientSoakTest.DIVERTED_AD2, true);
+
+ session.close();
+
+ sf.close();
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ server = null;
+ }
+
+ public void testSoakClient() throws Exception
+ {
+ final ClientSessionFactory sf = createFactory(IS_NETTY);
+
+ ClientSession session = sf.createSession(false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putLongProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i + " messages");
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+
+ Receiver rec1 = new Receiver(createFactory(IS_NETTY), DIVERTED_AD1.toString());
+ Receiver rec2 = new Receiver(createFactory(IS_NETTY), DIVERTED_AD2.toString());
+
+ Sender send = new Sender(createFactory(IS_NETTY), ADDRESS.toString(), new Receiver[] { rec1, rec2 });
+
+ send.start();
+ rec1.start();
+ rec2.start();
+
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+ while (timeEnd > System.currentTimeMillis())
+ {
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 || rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the clients, please look at the logs");
+ break;
+ }
+ Thread.sleep(10000);
+ }
+
+ send.setRunning(false);
+ rec1.setRunning(false);
+ rec2.setRunning(false);
+
+ send.join();
+ rec1.join();
+ rec2.join();
+
+ assertEquals(0, send.getErrorsCount());
+ assertEquals(0, rec1.getErrorsCount());
+ assertEquals(0, rec2.getErrorsCount());
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Receiver.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Receiver.java (rev 0)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Receiver.java 2010-09-14 23:39:40 UTC (rev 9687)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.utils.ReusableLatch;
+
+/**
+ * A Receiver
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Receiver extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // We should leave some messages on paging. We don't want to consume all for this test
+ private final Semaphore minConsume = new Semaphore(0);
+
+ private final ReusableLatch latchMax = new ReusableLatch(0);
+
+ private static final int MAX_DIFF = 10000;
+
+ // The difference between producer and consuming
+ private final AtomicInteger currentDiff = new AtomicInteger(0);
+
+ private final String queue;
+
+ protected long msgs = 0;
+
+ protected int pendingMsgs = 0;
+
+ protected int pendingSemaphores = 0;
+
+ protected ClientConsumer cons;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public Receiver(ClientSessionFactory sf, String queue)
+ {
+ super(sf);
+ this.queue = queue;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void run()
+ {
+ super.run();
+
+ while (running)
+ {
+ try
+ {
+ beginTX();
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ if (msg == null)
+ {
+ break;
+ }
+
+ msg.acknowledge();
+
+ if (msg.getLongProperty("count") != msgs + pendingMsgs)
+ {
+ errors++;
+ System.out.println("count should be " + (msgs + pendingMsgs) + " when it was " + msg.getLongProperty("count") + " on " + queue);
+ }
+
+ pendingMsgs++;
+ if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS))
+ {
+ break;
+ }
+
+ }
+
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#connectClients()
+ */
+ @Override
+ protected void connectClients() throws Exception
+ {
+
+ cons = session.createConsumer(queue);
+
+ session.start();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ msgs += pendingMsgs;
+ this.currentDiff.addAndGet(-pendingMsgs);
+ latchMax.countDown(pendingMsgs);
+ pendingMsgs = 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ minConsume.release(pendingMsgs);
+ pendingMsgs = 0;
+ }
+
+ public String toString()
+ {
+ return "Receiver::" + this.queue + ", msgs=" + msgs + ", pending=" + pendingMsgs;
+ }
+
+ /**
+ * @param pendingMsgs2
+ */
+ public void messageProduced(int producedMessages)
+ {
+ minConsume.release(producedMessages);
+ currentDiff.addAndGet(producedMessages);
+ if (currentDiff.get() > MAX_DIFF)
+ {
+ latchMax.setCount(currentDiff.get() - MAX_DIFF);
+ try
+ {
+ latchMax.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Sender.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Sender.java (rev 0)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/soak/client/Sender.java 2010-09-14 23:39:40 UTC (rev 9687)
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
+
+/**
+ * A Sender
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Sender extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected ClientProducer producer;
+
+ protected String queue;
+
+ protected long msgs = ClientSoakTest.MIN_MESSAGES_ON_QUEUE;
+
+ protected int pendingMsgs = 0;
+
+ protected final Receiver[] receivers;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public Sender(final ClientSessionFactory sf, String queue, final Receiver[] receivers)
+ {
+ super(sf);
+ this.receivers = receivers;
+ this.queue = queue;
+ }
+
+ @Override
+ protected void connectClients() throws Exception
+ {
+ producer = session.createProducer(queue);
+ }
+
+ public void run()
+ {
+ super.run();
+ while (running)
+ {
+ try
+ {
+ beginTX();
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putLongProperty("count", pendingMsgs + msgs);
+ msg.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ pendingMsgs++;
+ }
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ this.msgs += pendingMsgs;
+ for (Receiver rec : receivers)
+ {
+ rec.messageProduced(pendingMsgs);
+ }
+
+ pendingMsgs = 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ pendingMsgs = 0;
+ }
+
+ public String toString()
+ {
+ return "Sender, msgs=" + msgs + ", pending=" + pendingMsgs;
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
13 years, 9 months
JBoss hornetq SVN: r9686 - in trunk/tests/src/org/hornetq/tests/soak: client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-14 19:36:32 -0400 (Tue, 14 Sep 2010)
New Revision: 9686
Added:
trunk/tests/src/org/hornetq/tests/soak/client/
trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java
trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java
trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java
trunk/tests/src/org/hornetq/tests/soak/client/Sender.java
Log:
Adding test to replicate leakage
Added: trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java 2010-09-14 23:36:32 UTC (rev 9686)
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.logging.Logger;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class ClientAbstract extends Thread
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ClientAbstract.class.getName());
+
+ // Attributes ----------------------------------------------------
+
+ protected ClientSession session;
+
+ protected final ClientSessionFactory sf;
+
+ protected Xid activeXid;
+
+ protected volatile boolean running = true;
+
+ protected volatile int errors = 0;
+
+ /**
+ * A commit was called
+ * case we don't find the Xid, means it was accepted
+ */
+ protected volatile boolean pendingCommit = false;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ClientAbstract(ClientSessionFactory sf)
+ {
+ this.sf = sf;
+ }
+
+ // Public --------------------------------------------------------
+
+ public ClientSession getConnection()
+ {
+ return session;
+ }
+
+ public int getErrorsCount()
+ {
+ return errors;
+ }
+
+ public final void connect()
+ {
+ while (running)
+ {
+ try
+ {
+ disconnect();
+
+ session = sf.createXASession();
+
+ if (activeXid != null)
+ {
+ synchronized (ClientAbstract.class)
+ {
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ boolean found = false;
+ for (Xid recXid : xids)
+ {
+ if (recXid.equals(activeXid))
+ {
+ // System.out.println("Calling commit after a prepare on " + this);
+ found = true;
+ callCommit();
+ }
+ }
+
+ if (!found)
+ {
+ if (pendingCommit)
+ {
+ onCommit();
+ }
+ else
+ {
+ onRollback();
+ }
+
+ activeXid = null;
+ pendingCommit = false;
+ }
+ }
+ }
+
+ connectClients();
+
+ break;
+ }
+ catch (Exception e)
+ {
+ ClientAbstract.log.warning("Can't connect to server, retrying");
+ disconnect();
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignored)
+ {
+ // if an interruption was sent, we will respect it and leave the loop
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ connect();
+ }
+
+ protected void callCommit() throws Exception
+ {
+ pendingCommit = true;
+ session.commit(activeXid, false);
+ pendingCommit = false;
+ activeXid = null;
+ onCommit();
+ }
+
+ protected void callPrepare() throws Exception
+ {
+ session.prepare(activeXid);
+ }
+
+ public void beginTX() throws Exception
+ {
+ activeXid = newXID();
+
+ session.start(activeXid, XAResource.TMNOFLAGS);
+ }
+
+ public void endTX() throws Exception
+ {
+ session.end(activeXid, XAResource.TMSUCCESS);
+ callPrepare();
+ callCommit();
+ }
+
+ public void setRunning(final boolean running)
+ {
+ this.running = running;
+ }
+
+ /**
+ * @return
+ */
+ private XidImpl newXID()
+ {
+ return new XidImpl("tst".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
+ protected abstract void connectClients() throws Exception;
+
+ protected abstract void onCommit();
+
+ protected abstract void onRollback();
+
+ public void disconnect()
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ session = null;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java 2010-09-14 23:36:32 UTC (rev 9686)
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ClientSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ClientSoakTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("ADD");
+
+ private static final SimpleString DIVERTED_AD1 = ClientSoakTest.ADDRESS.concat("-1");
+
+ private static final SimpleString DIVERTED_AD2 = ClientSoakTest.ADDRESS.concat("-2");
+
+ private static final boolean IS_NETTY = true;
+
+ private static final boolean IS_JOURNAL = true;
+
+ public static final int MIN_MESSAGES_ON_QUEUE = 5000;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig(ClientSoakTest.IS_NETTY);
+
+ config.setJournalFileSize(10 * 1024 * 1024);
+
+ server = createServer(IS_JOURNAL, config, -1, -1, new HashMap<String, AddressSettings>());
+
+ DivertConfiguration divert1 = new DivertConfiguration("dv1",
+ "nm1",
+ ClientSoakTest.ADDRESS.toString(),
+ ClientSoakTest.DIVERTED_AD1.toString(),
+ true,
+ null,
+ null);
+
+ DivertConfiguration divert2 = new DivertConfiguration("dv2",
+ "nm2",
+ ClientSoakTest.ADDRESS.toString(),
+ ClientSoakTest.DIVERTED_AD2.toString(),
+ true,
+ null,
+ null);
+
+ ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+ divertList.add(divert1);
+ divertList.add(divert2);
+
+ config.setDivertConfigurations(divertList);
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(ClientSoakTest.IS_NETTY);
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(ClientSoakTest.ADDRESS, ClientSoakTest.ADDRESS, true);
+
+ session.createQueue(ClientSoakTest.DIVERTED_AD1, ClientSoakTest.DIVERTED_AD1, true);
+
+ session.createQueue(ClientSoakTest.DIVERTED_AD2, ClientSoakTest.DIVERTED_AD2, true);
+
+ session.close();
+
+ sf.close();
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ server = null;
+ }
+
+ public void testSoakClient() throws Exception
+ {
+ final ClientSessionFactory sf = createFactory(IS_NETTY);
+
+ ClientSession session = sf.createSession(false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putLongProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i + " messages");
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+ sf.close();
+
+ Receiver rec1 = new Receiver(createFactory(IS_NETTY), DIVERTED_AD1.toString());
+ Receiver rec2 = new Receiver(createFactory(IS_NETTY), DIVERTED_AD2.toString());
+
+ Sender send = new Sender(createFactory(IS_NETTY), ADDRESS.toString(), new Receiver[] { rec1, rec2 });
+
+ send.start();
+ rec1.start();
+ rec2.start();
+
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+ while (timeEnd > System.currentTimeMillis())
+ {
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 || rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the clients, please look at the logs");
+ break;
+ }
+ Thread.sleep(10000);
+ }
+
+ send.setRunning(false);
+ rec1.setRunning(false);
+ rec2.setRunning(false);
+
+ send.join();
+ rec1.join();
+ rec2.join();
+
+ assertEquals(0, send.getErrorsCount());
+ assertEquals(0, rec1.getErrorsCount());
+ assertEquals(0, rec2.getErrorsCount());
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java 2010-09-14 23:36:32 UTC (rev 9686)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.utils.ReusableLatch;
+
+/**
+ * A Receiver
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Receiver extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // We should leave some messages on paging. We don't want to consume all for this test
+ private final Semaphore minConsume = new Semaphore(0);
+
+ private final ReusableLatch latchMax = new ReusableLatch(0);
+
+ private static final int MAX_DIFF = 10000;
+
+ // The difference between producer and consuming
+ private final AtomicInteger currentDiff = new AtomicInteger(0);
+
+ private final String queue;
+
+ protected long msgs = 0;
+
+ protected int pendingMsgs = 0;
+
+ protected int pendingSemaphores = 0;
+
+ protected ClientConsumer cons;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public Receiver(ClientSessionFactory sf, String queue)
+ {
+ super(sf);
+ this.queue = queue;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void run()
+ {
+ super.run();
+
+ while (running)
+ {
+ try
+ {
+ beginTX();
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ if (msg == null)
+ {
+ break;
+ }
+
+ msg.acknowledge();
+
+ if (msg.getLongProperty("count") != msgs + pendingMsgs)
+ {
+ errors++;
+ System.out.println("count should be " + (msgs + pendingMsgs) + " when it was " + msg.getLongProperty("count") + " on " + queue);
+ }
+
+ pendingMsgs++;
+ if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS))
+ {
+ break;
+ }
+
+ }
+
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#connectClients()
+ */
+ @Override
+ protected void connectClients() throws Exception
+ {
+
+ cons = session.createConsumer(queue);
+
+ session.start();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ msgs += pendingMsgs;
+ this.currentDiff.addAndGet(-pendingMsgs);
+ latchMax.countDown(pendingMsgs);
+ pendingMsgs = 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ minConsume.release(pendingMsgs);
+ pendingMsgs = 0;
+ }
+
+ public String toString()
+ {
+ return "Receiver::" + this.queue + ", msgs=" + msgs + ", pending=" + pendingMsgs;
+ }
+
+ /**
+ * @param pendingMsgs2
+ */
+ public void messageProduced(int producedMessages)
+ {
+ minConsume.release(producedMessages);
+ currentDiff.addAndGet(producedMessages);
+ if (currentDiff.get() > MAX_DIFF)
+ {
+ latchMax.setCount(currentDiff.get() - MAX_DIFF);
+ try
+ {
+ latchMax.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/hornetq/tests/soak/client/Sender.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/Sender.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/soak/client/Sender.java 2010-09-14 23:36:32 UTC (rev 9686)
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.soak.client;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
+
+/**
+ * A Sender
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Sender extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected ClientProducer producer;
+
+ protected String queue;
+
+ protected long msgs = ClientSoakTest.MIN_MESSAGES_ON_QUEUE;
+
+ protected int pendingMsgs = 0;
+
+ protected final Receiver[] receivers;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public Sender(final ClientSessionFactory sf, String queue, final Receiver[] receivers)
+ {
+ super(sf);
+ this.receivers = receivers;
+ this.queue = queue;
+ }
+
+ @Override
+ protected void connectClients() throws Exception
+ {
+ producer = session.createProducer(queue);
+ }
+
+ public void run()
+ {
+ super.run();
+ while (running)
+ {
+ try
+ {
+ beginTX();
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putLongProperty("count", pendingMsgs + msgs);
+ msg.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ pendingMsgs++;
+ }
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ this.msgs += pendingMsgs;
+ for (Receiver rec : receivers)
+ {
+ rec.messageProduced(pendingMsgs);
+ }
+
+ pendingMsgs = 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ pendingMsgs = 0;
+ }
+
+ public String toString()
+ {
+ return "Sender, msgs=" + msgs + ", pending=" + pendingMsgs;
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
13 years, 9 months
JBoss hornetq SVN: r9685 - branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-14 17:54:18 -0400 (Tue, 14 Sep 2010)
New Revision: 9685
Modified:
branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java
Log:
tweak (sync with trunk)
Modified: branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-14 08:38:01 UTC (rev 9684)
+++ branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-14 21:54:18 UTC (rev 9685)
@@ -14,6 +14,7 @@
package org.hornetq.jms.persistence.config;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -112,8 +113,8 @@
public void encode(final HornetQBuffer buffer)
{
buffer.writeByte(type.getType());
- BufferHelper.writeAsSimpleString(buffer, name);
- BufferHelper.writeAsNullableSimpleString(buffer, selector);
+ buffer.writeSimpleString(SimpleString.toSimpleString(name));
+ buffer.writeNullableSimpleString(SimpleString.toSimpleString(selector));
buffer.writeBoolean(durable);
}
@@ -121,7 +122,8 @@
{
type = PersistedType.getType(buffer.readByte());
name = buffer.readSimpleString().toString();
- selector = BufferHelper.readNullableSimpleStringAsString(buffer);
+ SimpleString selectorStr = buffer.readNullableSimpleString();
+ selector = (selectorStr == null) ? null : selectorStr.toString();
durable = buffer.readBoolean();
}
}
13 years, 9 months
JBoss hornetq SVN: r9684 - in branches/Branch_2_1: src/main/org/hornetq/api/core/management and 2 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-14 04:38:01 -0400 (Tue, 14 Sep 2010)
New Revision: 9684
Modified:
branches/Branch_2_1/merge-activity.txt
branches/Branch_2_1/src/main/org/hornetq/api/core/management/AddressControl.java
branches/Branch_2_1/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-509 Diverts are listed in AddressControl.getQueueNames()
* list only QueueBindings in AddressControl.getQueueNames()
* add AddressControl.getBindingNames() to list *all* bindings
* tests
merge from trunk: svn merge -r 9682:9683 https://svn.jboss.org/repos/hornetq/trunk
Modified: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt 2010-09-14 08:20:09 UTC (rev 9683)
+++ branches/Branch_2_1/merge-activity.txt 2010-09-14 08:38:01 UTC (rev 9684)
@@ -15,4 +15,6 @@
- 10-sep-2010 - clebert - merge from this branch to trunk, r9591-9608, r9612-9659, r9660-9662
trunk is aligned with all the changes from Branch_2_1 up to r9668
+- 14-sep-2010 - jmesnil - merger from trunk -r 9682:9683 - https://jira.jboss.org/browse/HORNETQ-509
+
TODO: Bring changes from HORNETQ-469 as soon as it's stable (remove this line as soon as it's done)
Modified: branches/Branch_2_1/src/main/org/hornetq/api/core/management/AddressControl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/api/core/management/AddressControl.java 2010-09-14 08:20:09 UTC (rev 9683)
+++ branches/Branch_2_1/src/main/org/hornetq/api/core/management/AddressControl.java 2010-09-14 08:38:01 UTC (rev 9684)
@@ -56,6 +56,11 @@
*/
long getNumberOfBytesPerPage() throws Exception;
+ /**
+ * Returns the names of all bindings (both queues and diverts) bound to this address
+ */
+ String[] getBindingNames() throws Exception;
+
// Operations ----------------------------------------------------
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2010-09-14 08:20:09 UTC (rev 9683)
+++ branches/Branch_2_1/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2010-09-14 08:38:01 UTC (rev 9684)
@@ -13,6 +13,8 @@
package org.hornetq.core.management.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import javax.management.MBeanOperationInfo;
@@ -25,6 +27,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
import org.hornetq.core.settings.HierarchicalRepository;
@@ -86,13 +89,39 @@
try
{
Bindings bindings = postOffice.getBindingsForAddress(address);
- String[] queueNames = new String[bindings.getBindings().size()];
+ List<String> queueNames = new ArrayList<String>();
+ for (Binding binding : bindings.getBindings())
+ {
+ if (binding instanceof QueueBinding)
+ {
+ queueNames.add(binding.getUniqueName().toString());
+ }
+ }
+ return (String[])queueNames.toArray(new String[queueNames.size()]);
+ }
+ catch (Throwable t)
+ {
+ throw new IllegalStateException(t.getMessage());
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public String[] getBindingNames() throws Exception
+ {
+ clearIO();
+ try
+ {
+ Bindings bindings = postOffice.getBindingsForAddress(address);
+ String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings())
{
- queueNames[i++] = binding.getUniqueName().toString();
+ bindingNames[i++] = binding.getUniqueName().toString();
}
- return queueNames;
+ return bindingNames;
}
catch (Throwable t)
{
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2010-09-14 08:20:09 UTC (rev 9683)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2010-09-14 08:38:01 UTC (rev 9684)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.management;
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
import java.util.HashSet;
import java.util.Set;
@@ -20,7 +22,11 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.RoleInfo;
import org.hornetq.core.config.Configuration;
@@ -96,7 +102,32 @@
session.deleteQueue(anotherQueue);
}
+
+ public void testGetBindingNames() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+ String divertName = RandomUtil.randomString();
+
+ session.createQueue(address, queue, false);
+ AddressControl addressControl = createManagementControl(address);
+ String[] bindingNames = addressControl.getBindingNames();
+ assertEquals(1, bindingNames.length);
+ assertEquals(queue.toString(), bindingNames[0]);
+
+ server.getHornetQServerControl().createDivert(divertName, randomString(), address.toString(), RandomUtil.randomString(), false, null, null);
+
+ bindingNames = addressControl.getBindingNames();
+ Assert.assertEquals(2, bindingNames.length);
+
+ session.deleteQueue(queue);
+
+ bindingNames = addressControl.getBindingNames();
+ assertEquals(1, bindingNames.length);
+ assertEquals(divertName.toString(), bindingNames[0]);
+ }
+
public void testGetRoles() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-09-14 08:20:09 UTC (rev 9683)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-09-14 08:38:01 UTC (rev 9684)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.management;
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
import java.util.HashSet;
import java.util.Set;
@@ -25,7 +27,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ResourceNames;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -100,7 +101,32 @@
session.deleteQueue(anotherQueue);
}
+
+ public void testGetBindingNames() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+ String divertName = RandomUtil.randomString();
+
+ session.createQueue(address, queue, false);
+ CoreMessagingProxy proxy = createProxy(address);
+ Object[] bindingNames = (Object[])proxy.retrieveAttributeValue("bindingNames");
+ assertEquals(1, bindingNames.length);
+ assertEquals(queue.toString(), bindingNames[0]);
+
+ server.getHornetQServerControl().createDivert(divertName, randomString(), address.toString(), RandomUtil.randomString(), false, null, null);
+
+ bindingNames = (Object[])proxy.retrieveAttributeValue("bindingNames");
+ assertEquals(2, bindingNames.length);
+
+ session.deleteQueue(queue);
+
+ bindingNames = (Object[])proxy.retrieveAttributeValue("bindingNames");
+ assertEquals(1, bindingNames.length);
+ assertEquals(divertName.toString(), bindingNames[0]);
+ }
+
public void testGetRoles() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
13 years, 9 months
JBoss hornetq SVN: r9683 - in trunk: src/main/org/hornetq/core/management/impl and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-14 04:20:09 -0400 (Tue, 14 Sep 2010)
New Revision: 9683
Modified:
trunk/src/main/org/hornetq/api/core/management/AddressControl.java
trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-509 Diverts are listed in AddressControl.getQueueNames()
* list only QueueBindings in AddressControl.getQueueNames()
* add AddressControl.getBindingNames() to list *all* bindings
* tests
Modified: trunk/src/main/org/hornetq/api/core/management/AddressControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/AddressControl.java 2010-09-13 23:36:57 UTC (rev 9682)
+++ trunk/src/main/org/hornetq/api/core/management/AddressControl.java 2010-09-14 08:20:09 UTC (rev 9683)
@@ -56,6 +56,11 @@
*/
long getNumberOfBytesPerPage() throws Exception;
+ /**
+ * Returns the names of all bindings (both queues and diverts) bound to this address
+ */
+ String[] getBindingNames() throws Exception;
+
// Operations ----------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2010-09-13 23:36:57 UTC (rev 9682)
+++ trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2010-09-14 08:20:09 UTC (rev 9683)
@@ -13,6 +13,8 @@
package org.hornetq.core.management.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import javax.management.MBeanOperationInfo;
@@ -25,6 +27,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
import org.hornetq.core.settings.HierarchicalRepository;
@@ -86,13 +89,39 @@
try
{
Bindings bindings = postOffice.getBindingsForAddress(address);
- String[] queueNames = new String[bindings.getBindings().size()];
+ List<String> queueNames = new ArrayList<String>();
+ for (Binding binding : bindings.getBindings())
+ {
+ if (binding instanceof QueueBinding)
+ {
+ queueNames.add(binding.getUniqueName().toString());
+ }
+ }
+ return (String[])queueNames.toArray(new String[queueNames.size()]);
+ }
+ catch (Throwable t)
+ {
+ throw new IllegalStateException(t.getMessage());
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public String[] getBindingNames() throws Exception
+ {
+ clearIO();
+ try
+ {
+ Bindings bindings = postOffice.getBindingsForAddress(address);
+ String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings())
{
- queueNames[i++] = binding.getUniqueName().toString();
+ bindingNames[i++] = binding.getUniqueName().toString();
}
- return queueNames;
+ return bindingNames;
}
catch (Throwable t)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2010-09-13 23:36:57 UTC (rev 9682)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2010-09-14 08:20:09 UTC (rev 9683)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.management;
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
import java.util.HashSet;
import java.util.Set;
@@ -20,7 +22,11 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.RoleInfo;
import org.hornetq.core.config.Configuration;
@@ -96,7 +102,32 @@
session.deleteQueue(anotherQueue);
}
+
+ public void testGetBindingNames() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+ String divertName = RandomUtil.randomString();
+
+ session.createQueue(address, queue, false);
+ AddressControl addressControl = createManagementControl(address);
+ String[] bindingNames = addressControl.getBindingNames();
+ assertEquals(1, bindingNames.length);
+ assertEquals(queue.toString(), bindingNames[0]);
+
+ server.getHornetQServerControl().createDivert(divertName, randomString(), address.toString(), RandomUtil.randomString(), false, null, null);
+
+ bindingNames = addressControl.getBindingNames();
+ Assert.assertEquals(2, bindingNames.length);
+
+ session.deleteQueue(queue);
+
+ bindingNames = addressControl.getBindingNames();
+ assertEquals(1, bindingNames.length);
+ assertEquals(divertName.toString(), bindingNames[0]);
+ }
+
public void testGetRoles() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-09-13 23:36:57 UTC (rev 9682)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-09-14 08:20:09 UTC (rev 9683)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.management;
+import static org.hornetq.tests.util.RandomUtil.randomString;
+
import java.util.HashSet;
import java.util.Set;
@@ -25,7 +27,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ResourceNames;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -100,7 +101,32 @@
session.deleteQueue(anotherQueue);
}
+
+ public void testGetBindingNames() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+ String divertName = RandomUtil.randomString();
+
+ session.createQueue(address, queue, false);
+ CoreMessagingProxy proxy = createProxy(address);
+ Object[] bindingNames = (Object[])proxy.retrieveAttributeValue("bindingNames");
+ assertEquals(1, bindingNames.length);
+ assertEquals(queue.toString(), bindingNames[0]);
+
+ server.getHornetQServerControl().createDivert(divertName, randomString(), address.toString(), RandomUtil.randomString(), false, null, null);
+
+ bindingNames = (Object[])proxy.retrieveAttributeValue("bindingNames");
+ assertEquals(2, bindingNames.length);
+
+ session.deleteQueue(queue);
+
+ bindingNames = (Object[])proxy.retrieveAttributeValue("bindingNames");
+ assertEquals(1, bindingNames.length);
+ assertEquals(divertName.toString(), bindingNames[0]);
+ }
+
public void testGetRoles() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
13 years, 9 months
JBoss hornetq SVN: r9682 - in branches/Branch_2_1: src/config and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-13 19:36:57 -0400 (Mon, 13 Sep 2010)
New Revision: 9682
Modified:
branches/Branch_2_1/examples/common/build.xml
branches/Branch_2_1/src/config/examples-ant.properties
branches/Branch_2_1/src/config/javaee-examples-ant.properties
Log:
fixing examples for when running standalone (not as under EAP)
Modified: branches/Branch_2_1/examples/common/build.xml
===================================================================
--- branches/Branch_2_1/examples/common/build.xml 2010-09-13 22:45:10 UTC (rev 9681)
+++ branches/Branch_2_1/examples/common/build.xml 2010-09-13 23:36:57 UTC (rev 9682)
@@ -66,6 +66,7 @@
<fileset dir="${jars.dir}">
<include name="**/${jms-library}"/>
<include name="**/jboss-kernel.jar"/>
+ <include name="**/jboss-mc.jar"/>
</fileset>
<path refid="extra.classpath"/>
</path>
@@ -81,6 +82,7 @@
<include name="**/jnpserver.jar"/>
<include name="**/${jms-library}"/>
<include name="**/netty.jar"/>
+ <include name="**/jboss-mc.jar"/>
</fileset>
<path refid="extra.classpath"/>
</path>
@@ -97,6 +99,7 @@
<include name="**/jboss-logging-spi.jar"/>
<include name="**/${jms-library}"/>
<include name="**/jboss-kernel.jar"/>
+ <include name="**/jboss-mc.jar"/>
</fileset>
</path>
@@ -119,6 +122,7 @@
<include name="**/jboss-mdr.jar"/>
<include name="**/jbossxb.jar"/>
<include name="**/jboss-xml-binding.jar"/>
+ <include name="**/jboss-mc.jar"/>
<include name="**/twitter4j*.jar"/>
</fileset>
</path>
Modified: branches/Branch_2_1/src/config/examples-ant.properties
===================================================================
--- branches/Branch_2_1/src/config/examples-ant.properties 2010-09-13 22:45:10 UTC (rev 9681)
+++ branches/Branch_2_1/src/config/examples-ant.properties 2010-09-13 23:36:57 UTC (rev 9682)
@@ -1,4 +1,5 @@
hornetq.example.logserveroutput=true
hornetq.jars.dir=${imported.basedir}/../../lib
jars.dir=${imported.basedir}/../../lib
-aio.library.path=${imported.basedir}/../../bin
\ No newline at end of file
+aio.library.path=${imported.basedir}/../../bin
+jms-library=jboss-jms-api.jar
\ No newline at end of file
Modified: branches/Branch_2_1/src/config/javaee-examples-ant.properties
===================================================================
--- branches/Branch_2_1/src/config/javaee-examples-ant.properties 2010-09-13 22:45:10 UTC (rev 9681)
+++ branches/Branch_2_1/src/config/javaee-examples-ant.properties 2010-09-13 23:36:57 UTC (rev 9682)
@@ -2,3 +2,6 @@
hornetq.jars.dir=${imported.basedir}/../../../lib
jars.dir=${imported.basedir}/../../../lib
aio.library.path=${imported.basedir}/../../native/bin
+server.default=default-with-hornetq
+hornetq.config.dir=hornetq.sar
+jms-library=jboss-jms-api.jar
13 years, 9 months
JBoss hornetq SVN: r9681 - in trunk: src/main/org/hornetq/core/paging/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-13 18:45:10 -0400 (Mon, 13 Sep 2010)
New Revision: 9681
Modified:
trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java
trunk/src/main/org/hornetq/core/paging/PagingManager.java
trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
Fixing a leak on PageTransaction and adding tests on ordering for paging
Modified: trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -40,12 +40,12 @@
long getTransactionID();
- void store(StorageManager storageManager,Transaction tx) throws Exception;
+ void store(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
- void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws Exception;
+ void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
// To be used after the update was stored or reload
- void update(int update, StorageManager storageManager);
+ void update(int update, StorageManager storageManager, PagingManager pagingManager);
void increment();
Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging;
+import java.util.Map;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.postoffice.PostOffice;
@@ -67,6 +69,8 @@
* @param transactionID
*/
void removeTransaction(long transactionID);
+
+ Map<Long, PageTransactionInfo> getTransactions();
/**
* Reload previously created PagingStores into memory
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -56,6 +57,7 @@
public PageTransactionInfoImpl(final long transactionID)
{
+ this();
this.transactionID = transactionID;
countDownCompleted = new CountDownLatch(1);
}
@@ -81,7 +83,7 @@
return transactionID;
}
- public void update(final int update, final StorageManager storageManager)
+ public void update(final int update, final StorageManager storageManager, PagingManager pagingManager)
{
int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
if (sizeAfterUpdate == 0 && storageManager != null)
@@ -95,6 +97,11 @@
log.warn("Can't delete page transaction id=" + this.recordID);
}
}
+
+ if (sizeAfterUpdate == 0 && pagingManager != null)
+ {
+ pagingManager.removeTransaction(this.transactionID);
+ }
}
public void increment()
@@ -149,7 +156,7 @@
}
}
- public void store(final StorageManager storageManager, final Transaction tx) throws Exception
+ public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
{
storageManager.storePageTransaction(tx.getID(), this);
}
@@ -157,7 +164,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
*/
- public void storeUpdate(final StorageManager storageManager, final Transaction tx, final int depages) throws Exception
+ public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int depages) throws Exception
{
storageManager.updatePageTransaction(tx.getID(), this, depages);
@@ -187,7 +194,7 @@
public void afterCommit(Transaction tx)
{
- pgToUpdate.update(depages, storageManager);
+ pgToUpdate.update(depages, storageManager, pagingManager);
}
});
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.impl;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -155,7 +156,16 @@
{
return transactions.get(id);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return transactions;
+ }
+
// HornetQComponent implementation
// ------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -1059,7 +1059,7 @@
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
- entry.getKey().storeUpdate(storageManager, depageTransaction, entry.getValue().intValue());
+ entry.getKey().storeUpdate(storageManager, this.pagingManager, depageTransaction, entry.getValue().intValue());
}
depageTransaction.commit();
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -913,7 +913,7 @@
PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
- pageTX.update(pageUpdate.recods, null);
+ pageTX.update(pageUpdate.recods, null, null);
}
else
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -1269,7 +1269,7 @@
store.sync();
}
- pageTransaction.store(storageManager, tx);
+ pageTransaction.store(storageManager, pagingManager, tx);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -34,6 +34,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -163,6 +164,10 @@
for (int i = 0; i < numberOfMessages; i++)
{
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
message = session.createMessage(true);
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -240,6 +245,8 @@
}
consumer.close();
+
+ session.close();
}
catch (Throwable e)
{
@@ -260,6 +267,9 @@
{
threads[i].join();
}
+
+ assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
+
}
finally
{
@@ -812,6 +822,136 @@
}
+
+ public void testDepageDuringTransaction4() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.getConfiguration().setJournalSyncNonTransactional(false);
+ server.getConfiguration().setJournalSyncTransactional(false);
+
+ server.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int messageSize = 1024; // 1k
+ final int numberOfMessages = 10000;
+
+ try
+ {
+ final ClientSessionFactory sf = createInVMFactory();
+
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(false);
+
+ final byte[] body = new byte[messageSize];
+
+
+ Thread producerThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(false, false);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 50 == 0 && i != 0)
+ {
+ sessionProducer.commit();
+ //Thread.sleep(500);
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
+ };
+
+ ClientSession session = sf.createSession(true, true, 0);
+ session.start();
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ producerThread.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ if (i > 0 && i % 10 == 0)
+ {
+ //session.commit();
+ }
+ }
+ //session.commit();
+
+ session.close();
+
+ producerThread.join();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -688,6 +688,154 @@
Assert.assertEquals(0, storeImpl.getAddressSize());
}
+ public void testOrderOnPaging() throws Throwable
+ {
+ clearData();
+ SequentialFileFactory factory = new NIOSequentialFileFactory(this.getPageDir());
+
+ PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+ final int MAX_SIZE = 1024 * 10;
+
+ AddressSettings settings = new AddressSettings();
+ settings.setPageSizeBytes(MAX_SIZE);
+ settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ storeFactory,
+ new SimpleString("test"),
+ settings,
+ executor,
+ true);
+
+ storeImpl.start();
+
+ Assert.assertEquals(0, storeImpl.getNumberOfPages());
+
+ // Marked the store to be paged
+ storeImpl.startPaging();
+
+ final CountDownLatch producedLatch = new CountDownLatch(1);
+
+ Assert.assertEquals(1, storeImpl.getNumberOfPages());
+
+ final SimpleString destination = new SimpleString("test");
+
+ final long NUMBER_OF_MESSAGES = 100000;
+
+ final List<Throwable> errors = new ArrayList<Throwable>();
+
+ class WriterThread extends Thread
+ {
+
+ public WriterThread()
+ {
+ super("PageWriter");
+ }
+
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ for (long i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ // Each thread will Keep paging until all the messages are depaged.
+ // This is possible because the depage thread is not actually reading the pages.
+ // Just using the internal API to remove it from the page file system
+ ServerMessage msg = createMessage(i, storeImpl, destination, createRandomBuffer(i, 1024));
+ msg.putLongProperty("count", i);
+ while (!storeImpl.page(msg))
+ {
+ storeImpl.startPaging();
+ }
+
+ if (i == 0)
+ {
+ producedLatch.countDown();
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ }
+ }
+
+ class ReaderThread extends Thread
+ {
+ public ReaderThread()
+ {
+ super("PageReader");
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ long msgsRead = 0;
+
+ while (msgsRead < NUMBER_OF_MESSAGES)
+ {
+ Page page = storeImpl.depage();
+ if (page != null)
+ {
+ page.open();
+ List<PagedMessage> messages = page.read();
+
+ for (PagedMessage pgmsg : messages)
+ {
+ ServerMessage msg = pgmsg.getMessage(null);
+
+ assertEquals(msgsRead++, msg.getMessageID());
+
+ assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue());
+ }
+
+ page.close();
+ page.delete();
+ }
+ else
+ {
+ System.out.println("Depaged!!!!");
+ Thread.sleep(500);
+ }
+ }
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ }
+ }
+
+ WriterThread producerThread = new WriterThread();
+ producerThread.start();
+ ReaderThread consumer = new ReaderThread();
+ consumer.start();
+
+ producerThread.join();
+ consumer.join();
+
+ storeImpl.stop();
+
+ for (Throwable e: errors)
+ {
+ throw e;
+ }
+ }
+
/**
* @return
*/
@@ -878,6 +1026,15 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
class FakeStorageManager implements StorageManager
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-09-13 22:39:14 UTC (rev 9680)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-09-13 22:45:10 UTC (rev 9681)
@@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,7 +36,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
@@ -312,6 +312,14 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return null;
+ }
+
}
}
13 years, 9 months
JBoss hornetq SVN: r9680 - in branches/Branch_2_1: src/main/org/hornetq/core/paging/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-13 18:39:14 -0400 (Mon, 13 Sep 2010)
New Revision: 9680
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
Fixing a leak on PageTransaction and adding tests on ordering for paging
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -40,12 +40,12 @@
long getTransactionID();
- void store(StorageManager storageManager,Transaction tx) throws Exception;
+ void store(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
- void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws Exception;
+ void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
// To be used after the update was stored or reload
- void update(int update, StorageManager storageManager);
+ void update(int update, StorageManager storageManager, PagingManager pagingManager);
void increment();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/PagingManager.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging;
+import java.util.Map;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.postoffice.PostOffice;
@@ -67,6 +69,8 @@
* @param transactionID
*/
void removeTransaction(long transactionID);
+
+ Map<Long, PageTransactionInfo> getTransactions();
/**
* Reload previously created PagingStores into memory
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -56,6 +57,7 @@
public PageTransactionInfoImpl(final long transactionID)
{
+ this();
this.transactionID = transactionID;
countDownCompleted = new CountDownLatch(1);
}
@@ -81,7 +83,7 @@
return transactionID;
}
- public void update(final int update, final StorageManager storageManager)
+ public void update(final int update, final StorageManager storageManager, PagingManager pagingManager)
{
int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
if (sizeAfterUpdate == 0 && storageManager != null)
@@ -95,6 +97,11 @@
log.warn("Can't delete page transaction id=" + this.recordID);
}
}
+
+ if (sizeAfterUpdate == 0 && pagingManager != null)
+ {
+ pagingManager.removeTransaction(this.transactionID);
+ }
}
public void increment()
@@ -149,7 +156,7 @@
}
}
- public void store(final StorageManager storageManager, final Transaction tx) throws Exception
+ public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
{
storageManager.storePageTransaction(tx.getID(), this);
}
@@ -157,7 +164,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
*/
- public void storeUpdate(final StorageManager storageManager, final Transaction tx, final int depages) throws Exception
+ public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int depages) throws Exception
{
storageManager.updatePageTransaction(tx.getID(), this, depages);
@@ -187,7 +194,7 @@
public void afterCommit(Transaction tx)
{
- pgToUpdate.update(depages, storageManager);
+ pgToUpdate.update(depages, storageManager, pagingManager);
}
});
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.impl;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -155,7 +156,16 @@
{
return transactions.get(id);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return transactions;
+ }
+
// HornetQComponent implementation
// ------------------------------------------------------------------------------------------------
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -1059,7 +1059,7 @@
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
- entry.getKey().storeUpdate(storageManager, depageTransaction, entry.getValue().intValue());
+ entry.getKey().storeUpdate(storageManager, this.pagingManager, depageTransaction, entry.getValue().intValue());
}
depageTransaction.commit();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -913,7 +913,7 @@
PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
- pageTX.update(pageUpdate.recods, null);
+ pageTX.update(pageUpdate.recods, null, null);
}
else
{
Modified: branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -1267,7 +1267,7 @@
store.sync();
}
- pageTransaction.store(storageManager, tx);
+ pageTransaction.store(storageManager, pagingManager, tx);
}
}
}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -34,6 +34,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -163,6 +164,10 @@
for (int i = 0; i < numberOfMessages; i++)
{
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
message = session.createMessage(true);
HornetQBuffer bodyLocal = message.getBodyBuffer();
@@ -240,6 +245,8 @@
}
consumer.close();
+
+ session.close();
}
catch (Throwable e)
{
@@ -260,6 +267,9 @@
{
threads[i].join();
}
+
+ assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
+
}
finally
{
@@ -812,6 +822,136 @@
}
+
+ public void testDepageDuringTransaction4() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.getConfiguration().setJournalSyncNonTransactional(false);
+ server.getConfiguration().setJournalSyncTransactional(false);
+
+ server.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int messageSize = 1024; // 1k
+ final int numberOfMessages = 10000;
+
+ try
+ {
+ final ClientSessionFactory sf = createInVMFactory();
+
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(false);
+
+ final byte[] body = new byte[messageSize];
+
+
+ Thread producerThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(false, false);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 50 == 0 && i != 0)
+ {
+ sessionProducer.commit();
+ //Thread.sleep(500);
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
+ };
+
+ ClientSession session = sf.createSession(true, true, 0);
+ session.start();
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ producerThread.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ if (i > 0 && i % 10 == 0)
+ {
+ //session.commit();
+ }
+ }
+ //session.commit();
+
+ session.close();
+
+ producerThread.join();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -688,6 +688,154 @@
Assert.assertEquals(0, storeImpl.getAddressSize());
}
+ public void testOrderOnPaging() throws Throwable
+ {
+ clearData();
+ SequentialFileFactory factory = new NIOSequentialFileFactory(this.getPageDir());
+
+ PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+ final int MAX_SIZE = 1024 * 10;
+
+ AddressSettings settings = new AddressSettings();
+ settings.setPageSizeBytes(MAX_SIZE);
+ settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ storeFactory,
+ new SimpleString("test"),
+ settings,
+ executor,
+ true);
+
+ storeImpl.start();
+
+ Assert.assertEquals(0, storeImpl.getNumberOfPages());
+
+ // Marked the store to be paged
+ storeImpl.startPaging();
+
+ final CountDownLatch producedLatch = new CountDownLatch(1);
+
+ Assert.assertEquals(1, storeImpl.getNumberOfPages());
+
+ final SimpleString destination = new SimpleString("test");
+
+ final long NUMBER_OF_MESSAGES = 100000;
+
+ final List<Throwable> errors = new ArrayList<Throwable>();
+
+ class WriterThread extends Thread
+ {
+
+ public WriterThread()
+ {
+ super("PageWriter");
+ }
+
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ for (long i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ // Each thread will Keep paging until all the messages are depaged.
+ // This is possible because the depage thread is not actually reading the pages.
+ // Just using the internal API to remove it from the page file system
+ ServerMessage msg = createMessage(i, storeImpl, destination, createRandomBuffer(i, 1024));
+ msg.putLongProperty("count", i);
+ while (!storeImpl.page(msg))
+ {
+ storeImpl.startPaging();
+ }
+
+ if (i == 0)
+ {
+ producedLatch.countDown();
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ }
+ }
+
+ class ReaderThread extends Thread
+ {
+ public ReaderThread()
+ {
+ super("PageReader");
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+
+ long msgsRead = 0;
+
+ while (msgsRead < NUMBER_OF_MESSAGES)
+ {
+ Page page = storeImpl.depage();
+ if (page != null)
+ {
+ page.open();
+ List<PagedMessage> messages = page.read();
+
+ for (PagedMessage pgmsg : messages)
+ {
+ ServerMessage msg = pgmsg.getMessage(null);
+
+ assertEquals(msgsRead++, msg.getMessageID());
+
+ assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue());
+ }
+
+ page.close();
+ page.delete();
+ }
+ else
+ {
+ System.out.println("Depaged!!!!");
+ Thread.sleep(500);
+ }
+ }
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.add(e);
+ }
+ }
+ }
+
+ WriterThread producerThread = new WriterThread();
+ producerThread.start();
+ ReaderThread consumer = new ReaderThread();
+ consumer.start();
+
+ producerThread.join();
+ consumer.join();
+
+ storeImpl.stop();
+
+ for (Throwable e: errors)
+ {
+ throw e;
+ }
+ }
+
/**
* @return
*/
@@ -878,6 +1026,15 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
class FakeStorageManager implements StorageManager
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-09-13 21:33:55 UTC (rev 9679)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-09-13 22:39:14 UTC (rev 9680)
@@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,7 +36,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
@@ -312,6 +312,14 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return null;
+ }
+
}
}
13 years, 9 months