Author: ritchiem
Date: 2008-04-16 12:14:16 -0400 (Wed, 16 Apr 2008)
New Revision: 1923
Added:
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java
Modified:
store/branches/java/M2.x/java/bdbstore/pom.xml
Log:
Added test for RaceCondtion in present between BDBStore and Qpid Housekeeping thread.
Modified: store/branches/java/M2.x/java/bdbstore/pom.xml
===================================================================
--- store/branches/java/M2.x/java/bdbstore/pom.xml 2008-04-16 15:38:14 UTC (rev 1922)
+++ store/branches/java/M2.x/java/bdbstore/pom.xml 2008-04-16 16:14:16 UTC (rev 1923)
@@ -32,6 +32,10 @@
<version>1.0-incubating-M2.1-SNAPSHOT</version>
</parent>
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ </properties>
+
<!-- Local repository for the BerkelyDB-je so we don't have to use the
installer script -->
<repositories>
<repository>
@@ -78,6 +82,25 @@
<build>
<plugins>
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>QPID_HOME</name>
+
<value>${basedir}/${topDirectoryLocation}/broker</value>
+ </property>
+ </systemProperties>
+ <excludes>
+
<exclude>**/StoreContextRaceConditionTest.class</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+
<!-- Produce an all dependencies jar, to be used for running the hot
backup utility as a standalone tool. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Added:
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java
===================================================================
---
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java
(rev 0)
+++
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java 2008-04-16
16:14:16 UTC (rev 1923)
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.etp.qpid.server;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import java.io.File;
+
+public class StoreContextRaceConditionTest extends VMTestCase
+{
+ private static final Logger _logger =
Logger.getLogger(StoreContextRaceConditionTest.class);
+ private long _TimeToLive = 0L;
+ private static long SECOND = 1000L;
+
+ private static final String LOGGING_KEY = "amqj.logging.level";
+
+ private String _loggingOriginal;
+
+ public void setUp() throws Exception
+ {
+ //Disable the logging
+ _loggingOriginal = System.getProperty(LOGGING_KEY);
+ System.setProperty(LOGGING_KEY, "WARN");
+
+ // Initialise ACLs.
+ final String qpidHome = System.getProperty("QPID_HOME");
+
+ final File defaultaclConfigFile = new File(qpidHome,
"etc/persistent_config.xml");
+
+ if (!defaultaclConfigFile.exists())
+ {
+ System.err.println("Configuration file not found:" +
defaultaclConfigFile);
+ fail("Configuration file not found:" + defaultaclConfigFile);
+ }
+
+ if (System.getProperty("QPID_HOME") == null)
+ {
+ fail("QPID_HOME not set");
+ }
+
+ Configuration configuration =
ConfigurationFileApplicationRegistry.config(defaultaclConfigFile);
+
+ //Reset the housekeeping threads to run every second.
+
configuration.setProperty("virtualhosts.virtualhost.test.housekeeping.expiredMessageCheckPeriod",
10 * SECOND);
+ configuration.setProperty("management.enabled", "false");
+
+ _logger.info("Reset Housekeeping to :" +
configuration.getLong("virtualhosts.virtualhost.test.housekeeping.expiredMessageCheckPeriod"));
+
+ ConfigurationFileApplicationRegistry config = new
ConfigurationFileApplicationRegistry(configuration);
+
+ ApplicationRegistry.initialise(config, 1);
+
+ //Remove the Vhosts we are not using to free up CPU from extra housekeeping
threads.
+ VirtualHostRegistry vHostRegistry =
ApplicationRegistry.getInstance().getVirtualHostRegistry();
+ vHostRegistry.getVirtualHost("localhost").close();
+ vHostRegistry.getVirtualHost("development").close();
+
+ //Create the Broker
+ super.setUp();
+
+ _queue = (Queue) _context.lookup("queue");
+ }
+
+ public void tearDown()
+ {
+ System.setProperty(LOGGING_KEY, _loggingOriginal);
+ }
+
+ protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
+
+ protected Queue _queue;
+
+ protected void sendMessages(int num) throws JMSException
+ {
+ Connection producerConnection = null;
+ try
+ {
+ producerConnection = ((ConnectionFactory)
_context.lookup("connection")).createConnection();
+ }
+ catch (NamingException e)
+ {
+ fail("Unable to lookup connection in JNDI.");
+ }
+
+ sendMessages(producerConnection, num);
+ }
+
+ protected void sendMessages(Connection producerConnection, int num) throws
JMSException
+ {
+ Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ producerSession.createConsumer(_queue).close();
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ producer.setTimeToLive(_TimeToLive);
+ producer.setDisableMessageTimestamp(false);
+
+ for (int messsageID = 0; messsageID < num; messsageID++)
+ {
+ TextMessage textMsg = producerSession.createTextMessage("Message "
+ messsageID);
+ textMsg.setIntProperty(MESSAGE_ID_PROPERTY, messsageID);
+ producer.send(textMsg);
+ }
+
+ producerConnection.close();
+ }
+
+ public void test() throws InterruptedException, NamingException, JMSException
+ {
+ Runnable test = new Runnable()
+ {
+ public void run()
+ {
+
+ //Create Consumer
+ Connection connection = null;
+
+ Session session = null;
+ try
+ {
+ try
+ {
+ connection = ((ConnectionFactory)
_context.lookup("connection")).createConnection();
+ }
+ catch (NamingException e)
+ {
+ fail("Unable to lookup connection in JNDI.");
+ }
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ catch (JMSException e)
+ {
+ return;
+ }
+
+ try
+ {
+ int run = 1;
+ while (true)
+ {
+ try
+ {
+ //Stop the connection to prevent flow
+ connection.stop();
+ //Create Consumer to receive msgs
+ MessageConsumer consumer = session.createConsumer(_queue);
+
+ //Send one message to hold up the Async Delivery from
purging
+ _logger.info("***** CREATED Consumer");
+ _TimeToLive = 0L;
+ sendMessages(1);
+ _logger.info("***** SENT msg 1");
+ //Send 1000 msgs that will time out
+ _TimeToLive = 1000L;
+ sendMessages(50);
+ _logger.info("***** SENT TTL msgs");
+
+ //Timeout Messages - Note that we
+ Thread.sleep(1000);
+ _logger.info("***** SLEEP");
+
+ //Allw the messages to flow to us
+ connection.start();
+ _logger.info("***** START Consumer");
+ //*** Starts Async process
+
+ //Remove the first message so that the async will occcur and
start purging.
+ consumer.receive(1000);
+ _logger.info("***** RECEIVE Consumer");
+
+ sendMessages(50);
+ _logger.info("***** SENT TTL msgs");
+
+ //Close the consumer freeing the QHK thread to doing work
+ consumer.close();
+ _logger.info("***** CLOSE Consumer");
+ //** Allows QueueHouskeeping to run.
+ sendMessages(50);
+ _logger.info("***** SENT TTL msgs");
+
+ System.err.println("*********************************
Running test again (" + run +
+ ")in attempt to cause race
condition.");
+ run++;
+ }
+ catch (JMSException e)
+ {
+
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ finally
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use
File | Settings | File Templates.
+ }
+ _logger.info("***** Test Done");
+ }
+ }
+ };
+
+ int MAX_THREADS = 1;
+
+ Thread[] threads = new Thread[MAX_THREADS];
+
+ for (int concurentClients = 0; concurentClients < MAX_THREADS;
concurentClients++)
+ {
+ threads[concurentClients] = new Thread(test);
+ threads[concurentClients].start();
+ }
+
+ for (int concurentClients = 0; concurentClients < MAX_THREADS;
concurentClients++)
+ {
+ threads[concurentClients].join();
+ }
+ }
+
+ public static void main(String[] args) throws Exception, InterruptedException
+ {
+ StoreContextRaceConditionTest scrc = new StoreContextRaceConditionTest();
+
+ scrc.setUp();
+ scrc.test();
+// scrc.tearDown();
+ }
+
+}
Property changes on:
store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native