[rhmessaging-commits] rhmessaging commits: r1923 - in store/branches/java/M2.x/java/bdbstore: src/test/java/org and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Apr 16 12:14:16 EDT 2008


Author: ritchiem
Date: 2008-04-16 12:14:16 -0400 (Wed, 16 Apr 2008)
New Revision: 1923

Added:
   store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/
   store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/
   store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/
   store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java
Modified:
   store/branches/java/M2.x/java/bdbstore/pom.xml
Log:
Added test for RaceCondtion in present between BDBStore and Qpid Housekeeping thread.

Modified: store/branches/java/M2.x/java/bdbstore/pom.xml
===================================================================
--- store/branches/java/M2.x/java/bdbstore/pom.xml	2008-04-16 15:38:14 UTC (rev 1922)
+++ store/branches/java/M2.x/java/bdbstore/pom.xml	2008-04-16 16:14:16 UTC (rev 1923)
@@ -32,6 +32,10 @@
         <version>1.0-incubating-M2.1-SNAPSHOT</version>
     </parent>
 
+    <properties>
+        <topDirectoryLocation>..</topDirectoryLocation>
+    </properties>
+
     <!-- Local repository for the BerkelyDB-je so we don't have to use the installer script --> 
     <repositories>
         <repository>
@@ -78,6 +82,25 @@
 
     <build>
         <plugins>
+
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>QPID_HOME</name>
+                            <value>${basedir}/${topDirectoryLocation}/broker</value>
+                        </property>
+                    </systemProperties>
+                    <excludes>
+                        <exclude>**/StoreContextRaceConditionTest.class</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+
+
             <!-- Produce an all dependencies jar, to be used for running the hot backup utility as a standalone tool. -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>

Added: store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java
===================================================================
--- store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java	                        (rev 0)
+++ store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java	2008-04-16 16:14:16 UTC (rev 1923)
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.etp.qpid.server;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import java.io.File;
+
+public class StoreContextRaceConditionTest extends VMTestCase
+{
+    private static final Logger _logger = Logger.getLogger(StoreContextRaceConditionTest.class);
+    private long _TimeToLive = 0L;
+    private static long SECOND = 1000L;
+
+    private static final String LOGGING_KEY = "amqj.logging.level";
+
+    private String _loggingOriginal;
+
+    public void setUp() throws Exception
+    {
+        //Disable the logging
+        _loggingOriginal = System.getProperty(LOGGING_KEY);
+        System.setProperty(LOGGING_KEY, "WARN");
+
+        // Initialise ACLs.
+        final String qpidHome = System.getProperty("QPID_HOME");
+
+        final File defaultaclConfigFile = new File(qpidHome, "etc/persistent_config.xml");
+
+        if (!defaultaclConfigFile.exists())
+        {
+            System.err.println("Configuration file not found:" + defaultaclConfigFile);
+            fail("Configuration file not found:" + defaultaclConfigFile);
+        }
+
+        if (System.getProperty("QPID_HOME") == null)
+        {
+            fail("QPID_HOME not set");
+        }
+
+        Configuration configuration = ConfigurationFileApplicationRegistry.config(defaultaclConfigFile);
+
+        //Reset the housekeeping threads to run every second.
+        configuration.setProperty("virtualhosts.virtualhost.test.housekeeping.expiredMessageCheckPeriod", 10 * SECOND);
+        configuration.setProperty("management.enabled", "false");
+
+        _logger.info("Reset Housekeeping to :" + configuration.getLong("virtualhosts.virtualhost.test.housekeeping.expiredMessageCheckPeriod"));
+
+        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configuration);
+
+        ApplicationRegistry.initialise(config, 1);
+
+        //Remove the Vhosts we are not using to free up CPU from extra housekeeping threads.
+        VirtualHostRegistry vHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
+        vHostRegistry.getVirtualHost("localhost").close();
+        vHostRegistry.getVirtualHost("development").close();
+
+        //Create the Broker
+        super.setUp();
+
+        _queue = (Queue) _context.lookup("queue");
+    }
+
+    public void tearDown()
+    {
+        System.setProperty(LOGGING_KEY, _loggingOriginal);
+    }
+
+    protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
+
+    protected Queue _queue;
+
+    protected void sendMessages(int num) throws JMSException
+    {
+        Connection producerConnection = null;
+        try
+        {
+            producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+        }
+        catch (NamingException e)
+        {
+            fail("Unable to lookup connection in JNDI.");
+        }
+
+        sendMessages(producerConnection, num);
+    }
+
+    protected void sendMessages(Connection producerConnection, int num) throws JMSException
+    {
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //Ensure _queue is created
+        producerSession.createConsumer(_queue).close();
+
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        producer.setTimeToLive(_TimeToLive);
+        producer.setDisableMessageTimestamp(false);
+
+        for (int messsageID = 0; messsageID < num; messsageID++)
+        {
+            TextMessage textMsg = producerSession.createTextMessage("Message " + messsageID);
+            textMsg.setIntProperty(MESSAGE_ID_PROPERTY, messsageID);
+            producer.send(textMsg);
+        }
+
+        producerConnection.close();
+    }
+
+    public void test() throws InterruptedException, NamingException, JMSException
+    {
+        Runnable test = new Runnable()
+        {
+            public void run()
+            {
+
+                //Create Consumer
+                Connection connection = null;
+
+                Session session = null;
+                try
+                {
+                    try
+                    {
+                        connection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+                    }
+                    catch (NamingException e)
+                    {
+                        fail("Unable to lookup connection in JNDI.");
+                    }
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                }
+                catch (JMSException e)
+                {
+                    return;
+                }
+
+                try
+                {
+                    int run = 1;
+                    while (true)
+                    {
+                        try
+                        {
+                            //Stop the connection to prevent flow
+                            connection.stop();
+                            //Create Consumer to receive msgs
+                            MessageConsumer consumer = session.createConsumer(_queue);
+
+                            //Send one message to hold up the Async Delivery from purging
+                            _logger.info("***** CREATED Consumer");
+                            _TimeToLive = 0L;
+                            sendMessages(1);
+                            _logger.info("***** SENT msg 1");
+                            //Send 1000 msgs that will time out
+                            _TimeToLive = 1000L;
+                            sendMessages(50);
+                            _logger.info("***** SENT TTL msgs");
+
+                            //Timeout Messages - Note that we
+                            Thread.sleep(1000);
+                            _logger.info("***** SLEEP");
+
+                            //Allw the messages to flow to us
+                            connection.start();
+                            _logger.info("***** START Consumer");
+                            //*** Starts Async process
+
+                            //Remove the first message so that the async will occcur and start purging.
+                            consumer.receive(1000);
+                            _logger.info("***** RECEIVE Consumer");
+
+                            sendMessages(50);
+                            _logger.info("***** SENT TTL msgs");
+
+                            //Close the consumer freeing the QHK thread to doing work
+                            consumer.close();
+                            _logger.info("***** CLOSE Consumer");
+                            //** Allows QueueHouskeeping to run.
+                            sendMessages(50);
+                            _logger.info("***** SENT TTL msgs");
+
+                            System.err.println("********************************* Running test again (" + run +
+                                               ")in attempt to cause race condition.");
+                            run++;
+                        }
+                        catch (JMSException e)
+                        {
+
+                        }
+                        catch (InterruptedException e)
+                        {
+                        }
+                    }
+                }
+                finally
+                {
+                    try
+                    {
+                        connection.close();
+                    }
+                    catch (JMSException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                    _logger.info("***** Test Done");
+                }
+            }
+        };
+
+        int MAX_THREADS = 1;
+
+        Thread[] threads = new Thread[MAX_THREADS];
+
+        for (int concurentClients = 0; concurentClients < MAX_THREADS; concurentClients++)
+        {
+            threads[concurentClients] = new Thread(test);
+            threads[concurentClients].start();
+        }
+
+        for (int concurentClients = 0; concurentClients < MAX_THREADS; concurentClients++)
+        {
+            threads[concurentClients].join();
+        }
+    }
+
+    public static void main(String[] args) throws Exception, InterruptedException
+    {
+        StoreContextRaceConditionTest scrc = new StoreContextRaceConditionTest();
+
+        scrc.setUp();
+        scrc.test();
+//        scrc.tearDown();
+    }
+
+}


Property changes on: store/branches/java/M2.x/java/bdbstore/src/test/java/org/etp/qpid/server/StoreContextRaceConditionTest.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native




More information about the rhmessaging-commits mailing list