From do-not-reply at jboss.org Tue Sep 14 19:36:33 2010 Content-Type: multipart/mixed; boundary="===============1481181551823484277==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r9686 - in trunk/tests/src/org/hornetq/tests/soak: client and 1 other directory. Date: Tue, 14 Sep 2010 19:36:33 -0400 Message-ID: <201009142336.o8ENaX0V007294@svn01.web.mwc.hst.phx2.redhat.com> --===============1481181551823484277== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: clebert.suconic(a)jboss.com Date: 2010-09-14 19:36:32 -0400 (Tue, 14 Sep 2010) New Revision: 9686 Added: trunk/tests/src/org/hornetq/tests/soak/client/ trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java trunk/tests/src/org/hornetq/tests/soak/client/Sender.java Log: Adding test to replicate leakage Added: trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java = (rev 0) +++ trunk/tests/src/org/hornetq/tests/soak/client/ClientAbstract.java 2010-= 09-14 23:36:32 UTC (rev 9686) @@ -0,0 +1,227 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.tests.soak.client; + +import java.util.logging.Logger; + +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.core.transaction.impl.XidImpl; +import org.hornetq.utils.UUIDGenerator; + +/** + * WARNING: This is not a sample on how you should handle XA. + * You are supposed to use a TransactionManager. + * This class is doing the job of a TransactionManager that fits = for the purpose of this test only, + * however there are many more pitfalls to deal with Transactions= . = + * = + * This is just to stress and soak test Transactions with HornetQ= . = + * = + * And this is dealing with XA directly for the purpose testing o= nly. + * + * @author Clebert Suconic<= /a> + * + * + */ +public abstract class ClientAbstract extends Thread +{ + + // Constants ----------------------------------------------------- + private static final Logger log =3D Logger.getLogger(ClientAbstract.cla= ss.getName()); + + // Attributes ---------------------------------------------------- + + protected ClientSession session; + + protected final ClientSessionFactory sf; + = + protected Xid activeXid; + + protected volatile boolean running =3D true; + + protected volatile int errors =3D 0; + + /** + * A commit was called + * case we don't find the Xid, means it was accepted + */ + protected volatile boolean pendingCommit =3D false; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + = + public ClientAbstract(ClientSessionFactory sf) + { + this.sf =3D sf; + } + + // Public -------------------------------------------------------- + + public ClientSession getConnection() + { + return session; + } + + public int getErrorsCount() + { + return errors; + } + + public final void connect() + { + while (running) + { + try + { + disconnect(); + + session =3D sf.createXASession(); + + if (activeXid !=3D null) + { + synchronized (ClientAbstract.class) + { + Xid[] xids =3D session.recover(XAResource.TMSTARTRSCAN); + boolean found =3D false; + for (Xid recXid : xids) + { + if (recXid.equals(activeXid)) + { + // System.out.println("Calling commit after a prep= are on " + this); + found =3D true; + callCommit(); + } + } + + if (!found) + { + if (pendingCommit) + { + onCommit(); + } + else + { + onRollback(); + } + + activeXid =3D null; + pendingCommit =3D false; + } + } + } + + connectClients(); + + break; + } + catch (Exception e) + { + ClientAbstract.log.warning("Can't connect to server, retrying"= ); + disconnect(); + try + { + Thread.sleep(1000); + } + catch (InterruptedException ignored) + { + // if an interruption was sent, we will respect it and leav= e the loop + break; + } + } + } + } + + @Override + public void run() + { + connect(); + } + + protected void callCommit() throws Exception + { + pendingCommit =3D true; + session.commit(activeXid, false); + pendingCommit =3D false; + activeXid =3D null; + onCommit(); + } + + protected void callPrepare() throws Exception + { + session.prepare(activeXid); + } + + public void beginTX() throws Exception + { + activeXid =3D newXID(); + + session.start(activeXid, XAResource.TMNOFLAGS); + } + + public void endTX() throws Exception + { + session.end(activeXid, XAResource.TMSUCCESS); + callPrepare(); + callCommit(); + } + + public void setRunning(final boolean running) + { + this.running =3D running; + } + + /** + * @return + */ + private XidImpl newXID() + { + return new XidImpl("tst".getBytes(), 1, UUIDGenerator.getInstance().= generateStringUUID().getBytes()); + } + + protected abstract void connectClients() throws Exception; + + protected abstract void onCommit(); + + protected abstract void onRollback(); + + public void disconnect() + { + try + { + if (session !=3D null) + { + session.close(); + } + } + catch (Exception ignored) + { + ignored.printStackTrace(); + } + + session =3D null; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} Added: trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java = (rev 0) +++ trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java 2010-= 09-14 23:36:32 UTC (rev 9686) @@ -0,0 +1,192 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.tests.soak.client; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.DivertConfiguration; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.tests.util.ServiceTestBase; + +/** + * A ClientSoakTest + * + * @author Clebert Suconic<= /a> + * + * + */ +public class ClientSoakTest extends ServiceTestBase +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private static final SimpleString ADDRESS =3D new SimpleString("ADD"); + + private static final SimpleString DIVERTED_AD1 =3D ClientSoakTest.ADDRE= SS.concat("-1"); + + private static final SimpleString DIVERTED_AD2 =3D ClientSoakTest.ADDRE= SS.concat("-2"); + + private static final boolean IS_NETTY =3D true; + + private static final boolean IS_JOURNAL =3D true; + + public static final int MIN_MESSAGES_ON_QUEUE =3D 5000; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + private HornetQServer server; + + @Override + protected void setUp() throws Exception + { + clearData(); + + Configuration config =3D createDefaultConfig(ClientSoakTest.IS_NETTY= ); + + config.setJournalFileSize(10 * 1024 * 1024); + + server =3D createServer(IS_JOURNAL, config, -1, -1, new HashMap()); + + DivertConfiguration divert1 =3D new DivertConfiguration("dv1", + "nm1", + ClientSoakTest= .ADDRESS.toString(), + ClientSoakTest= .DIVERTED_AD1.toString(), + true, + null, + null); + + DivertConfiguration divert2 =3D new DivertConfiguration("dv2", + "nm2", + ClientSoakTest= .ADDRESS.toString(), + ClientSoakTest= .DIVERTED_AD2.toString(), + true, + null, + null); + + ArrayList divertList =3D new ArrayList(); + divertList.add(divert1); + divertList.add(divert2); + + config.setDivertConfigurations(divertList); + + server.start(); + + ClientSessionFactory sf =3D createFactory(ClientSoakTest.IS_NETTY); + + ClientSession session =3D sf.createSession(); + + session.createQueue(ClientSoakTest.ADDRESS, ClientSoakTest.ADDRESS, = true); + + session.createQueue(ClientSoakTest.DIVERTED_AD1, ClientSoakTest.DIVE= RTED_AD1, true); + + session.createQueue(ClientSoakTest.DIVERTED_AD2, ClientSoakTest.DIVE= RTED_AD2, true); + + session.close(); + + sf.close(); + + } + + @Override + protected void tearDown() throws Exception + { + server.stop(); + server =3D null; + } + + public void testSoakClient() throws Exception + { + final ClientSessionFactory sf =3D createFactory(IS_NETTY); + + ClientSession session =3D sf.createSession(false, false); + + ClientProducer producer =3D session.createProducer(ADDRESS); + + for (int i =3D 0; i < MIN_MESSAGES_ON_QUEUE; i++) + { + ClientMessage msg =3D session.createMessage(true); + msg.putLongProperty("count", i); + msg.getBodyBuffer().writeBytes(new byte[10 * 1024]); + producer.send(msg); + + if (i % 1000 =3D=3D 0) + { + System.out.println("Sent " + i + " messages"); + session.commit(); + } + } + + session.commit(); + + session.close(); + sf.close(); + + Receiver rec1 =3D new Receiver(createFactory(IS_NETTY), DIVERTED_AD1= .toString()); + Receiver rec2 =3D new Receiver(createFactory(IS_NETTY), DIVERTED_AD2= .toString()); + + Sender send =3D new Sender(createFactory(IS_NETTY), ADDRESS.toString= (), new Receiver[] { rec1, rec2 }); + + send.start(); + rec1.start(); + rec2.start(); + + long timeEnd =3D System.currentTimeMillis() + TimeUnit.HOURS.toMilli= s(1); + while (timeEnd > System.currentTimeMillis()) + { + if (send.getErrorsCount() !=3D 0 || rec1.getErrorsCount() !=3D 0 = || rec2.getErrorsCount() !=3D 0) + { + System.out.println("There are sequence errors in some of the c= lients, please look at the logs"); + break; + } + Thread.sleep(10000); + } + + send.setRunning(false); + rec1.setRunning(false); + rec2.setRunning(false); + + send.join(); + rec1.join(); + rec2.join(); + + assertEquals(0, send.getErrorsCount()); + assertEquals(0, rec1.getErrorsCount()); + assertEquals(0, rec2.getErrorsCount()); + + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} Added: trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java = (rev 0) +++ trunk/tests/src/org/hornetq/tests/soak/client/Receiver.java 2010-09-14 = 23:36:32 UTC (rev 9686) @@ -0,0 +1,185 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.tests.soak.client; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.hornetq.api.core.client.ClientConsumer; +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.utils.ReusableLatch; + +/** + * A Receiver + * + * @author Clebert Suconic<= /a> + * + * + */ +public class Receiver extends ClientAbstract +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + = + // We should leave some messages on paging. We don't want to consume al= l for this test + private final Semaphore minConsume =3D new Semaphore(0); + = + private final ReusableLatch latchMax =3D new ReusableLatch(0); + = + private static final int MAX_DIFF =3D 10000; + = + // The difference between producer and consuming + private final AtomicInteger currentDiff =3D new AtomicInteger(0); + = + private final String queue; + = + protected long msgs =3D 0; + = + protected int pendingMsgs =3D 0; + = + protected int pendingSemaphores =3D 0; + = + protected ClientConsumer cons; + + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public Receiver(ClientSessionFactory sf, String queue) + { + super(sf); + this.queue =3D queue; + } + = + // Public -------------------------------------------------------- + + public void run() + { + super.run(); + = + while (running) + { + try + { + beginTX(); + = + for (int i =3D 0 ; i < 1000; i++) + { + ClientMessage msg =3D cons.receive(5000); + if (msg =3D=3D null) + { + break; + } + = + msg.acknowledge(); + = + if (msg.getLongProperty("count") !=3D msgs + pendingMsgs) + { + errors++; + System.out.println("count should be " + (msgs + pendingM= sgs) + " when it was " + msg.getLongProperty("count") + " on " + queue); + } + = + pendingMsgs++; + if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS)) + { + break; + } + = + } + = + endTX(); + } + catch (Exception e) + { + connect(); + } + = + = + } + } + = + /* (non-Javadoc) + * @see org.hornetq.jms.example.ClientAbstract#connectClients() + */ + @Override + protected void connectClients() throws Exception + { + = + cons =3D session.createConsumer(queue); + = + session.start(); + } + + /* (non-Javadoc) + * @see org.hornetq.jms.example.ClientAbstract#onCommit() + */ + @Override + protected void onCommit() + { + msgs +=3D pendingMsgs; + this.currentDiff.addAndGet(-pendingMsgs); + latchMax.countDown(pendingMsgs); + pendingMsgs =3D 0; + } + + /* (non-Javadoc) + * @see org.hornetq.jms.example.ClientAbstract#onRollback() + */ + @Override + protected void onRollback() + { + minConsume.release(pendingMsgs); + pendingMsgs =3D 0; + } + = + public String toString() + { + return "Receiver::" + this.queue + ", msgs=3D" + msgs + ", pending= =3D" + pendingMsgs; + } + + /** + * @param pendingMsgs2 + */ + public void messageProduced(int producedMessages) + { + minConsume.release(producedMessages); + currentDiff.addAndGet(producedMessages); + if (currentDiff.get() > MAX_DIFF) + { + latchMax.setCount(currentDiff.get() - MAX_DIFF); + try + { + latchMax.await(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} Added: trunk/tests/src/org/hornetq/tests/soak/client/Sender.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/soak/client/Sender.java = (rev 0) +++ trunk/tests/src/org/hornetq/tests/soak/client/Sender.java 2010-09-14 23= :36:32 UTC (rev 9686) @@ -0,0 +1,126 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.tests.soak.client; + +import org.hornetq.api.core.client.ClientMessage; +import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.client.ClientSessionFactory; + +/** + * A Sender + * + * @author Clebert Suconic<= /a> + * + * + */ +public class Sender extends ClientAbstract +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + protected ClientProducer producer; + + protected String queue; + + protected long msgs =3D ClientSoakTest.MIN_MESSAGES_ON_QUEUE; + + protected int pendingMsgs =3D 0; + + protected final Receiver[] receivers; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public Sender(final ClientSessionFactory sf, String queue, final Receiv= er[] receivers) + { + super(sf); + this.receivers =3D receivers; + this.queue =3D queue; + } + + @Override + protected void connectClients() throws Exception + { + producer =3D session.createProducer(queue); + } + + public void run() + { + super.run(); + while (running) + { + try + { + beginTX(); + for (int i =3D 0; i < 1000; i++) + { + ClientMessage msg =3D session.createMessage(true); + msg.putLongProperty("count", pendingMsgs + msgs); + msg.getBodyBuffer().writeBytes(new byte[10 * 1024]); + producer.send(msg); + pendingMsgs++; + } + endTX(); + } + catch (Exception e) + { + connect(); + } + } + } + + /* (non-Javadoc) + * @see org.hornetq.jms.example.ClientAbstract#onCommit() + */ + @Override + protected void onCommit() + { + this.msgs +=3D pendingMsgs; + for (Receiver rec : receivers) + { + rec.messageProduced(pendingMsgs); + } + + pendingMsgs =3D 0; + } + + /* (non-Javadoc) + * @see org.hornetq.jms.example.ClientAbstract#onRollback() + */ + @Override + protected void onRollback() + { + pendingMsgs =3D 0; + } + + public String toString() + { + return "Sender, msgs=3D" + msgs + ", pending=3D" + pendingMsgs; + + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} --===============1481181551823484277==--