[rhmessaging-commits] rhmessaging commits: r3778 - in store/branches/java/0.5-release: src/tools/java and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jan 11 10:00:28 EST 2010


Author: rgemmell
Date: 2010-01-11 10:00:27 -0500 (Mon, 11 Jan 2010)
New Revision: 3778

Added:
   store/branches/java/0.5-release/bin/bindingsWorkaround.sh
   store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml
   store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java
Log:
Add initial version of tool to workaround broker bindings issue. Use the broker config to load the VirtualHosts and then insert required durable Exchange entries into the store


Added: store/branches/java/0.5-release/bin/bindingsWorkaround.sh
===================================================================
--- store/branches/java/0.5-release/bin/bindingsWorkaround.sh	                        (rev 0)
+++ store/branches/java/0.5-release/bin/bindingsWorkaround.sh	2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+QPID_VERSION=0.5
+
+if [ -z "$QPID_HOME" ]; then
+    echo "QPID_HOME not set. Exiting"
+    exit 1
+fi
+
+if [ -z "$QPID_WORK" ]; then
+    echo "QPID_WORK not set. Exiting"
+    exit 1
+fi
+
+QPID_SYS_PROPS="-DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK"
+
+LIBS=$QPID_HOME/lib/qpid-bdbtools-$QPID_VERSION.jar:$QPID_HOME/lib/je-3.3.62.jar:$QPID_HOME/lib/qpid-bdbstore-$QPID_VERSION.jar:$QPID_HOME/lib/qpid-all.jar
+
+java -Dlog4j.configuration=BDBStoreBindingsWorkaround-log4j.xml $QPID_SYS_PROPS -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreBindingsWorkaround "$@"
+exitValue=$?
+
+if [ $exitValue != 0 ] 
+then
+  exit $exitValue
+fi
+

Added: store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml
===================================================================
--- store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml	                        (rev 0)
+++ store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml	2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements.  See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+    <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - %m%n"/>
+        </layout>
+    </appender>
+
+    <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreBindingsWorkaround">
+        <priority value="info"/>
+    </category>
+
+    <!-- Only show errors -->
+    <category name="org.apache">
+        <priority value="error"/>
+    </category>
+
+    <root>
+        <priority value="error"/>
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</log4j:configuration>

Added: store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java
===================================================================
--- store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java	                        (rev 0)
+++ store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java	2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,310 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.configuration.Configuration.InitException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class BDBStoreBindingsWorkaround
+{
+    private static final Logger _logger = LoggerFactory.getLogger(BDBStoreBindingsWorkaround.class);
+    
+    private Configuration _config;
+    private boolean _initialised = false;
+
+    public static void main(String[] args) throws Configuration.InitException
+    {
+        BDBStoreBindingsWorkaround tool = new BDBStoreBindingsWorkaround(args);
+
+        tool.start();
+
+        //Shut down the JVM gracefully, the ShutdownHook will stop the VirtualHosts.
+        System.exit(0);
+    }
+
+    public BDBStoreBindingsWorkaround(String[] args) throws InitException
+    {     
+        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
+
+        loadConfig(args);
+    }
+
+    @SuppressWarnings("static-access")
+    private void loadConfig(String args[]) throws InitException
+    {
+        _config = new Configuration();
+        
+        Option configFile =
+            OptionBuilder.withArgName("file").hasArg()
+            .withDescription("use given configuration file By "
+                    + "default looks for a file named "
+                    + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
+                    .withLongOpt("config")
+                    .create("c");
+
+        _config.setOption(configFile);
+        
+        _config.processCommandline(args);
+    }
+    
+    /**
+     * Simple ShutdownHook to cleanly shutdown the VirtualHosts on JVM shut down
+     */
+    protected class ShutdownHook implements Runnable
+    {
+        BDBStoreBindingsWorkaround _tool;
+
+        ShutdownHook(BDBStoreBindingsWorkaround bindingsTool)
+        {
+            _tool = bindingsTool;
+        }
+
+        public void run()
+        {
+            _tool.shutdown();
+        }
+    }
+
+    protected void shutdown()
+    {
+        if (_initialised)
+        {
+            ApplicationRegistry.remove(1);
+        }
+    }
+
+    protected void start()
+    {
+        _initialised = false;
+        
+        _logger.info("BDBStore BindingsWorkaround process commencing");
+        
+        loadVirtualHosts();
+
+        if (!_initialised)
+        {
+            System.exit(1);
+        }
+        
+        addDurableExchangesToStoreIfRequired();
+        _logger.info("Workaround process complete");
+    }
+
+    private void loadVirtualHosts()
+    {
+        final File configFile = _config.getConfigFile();
+
+        if (!configFile.exists())
+        {
+            _logger.error("Config file not found:" + configFile.getAbsolutePath());
+            _logger.error("Options: [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
+            return;
+        }
+        else
+        {
+            _logger.info("Using config file :" + configFile.getAbsolutePath());
+        }
+
+        try
+        {
+            _logger.info("Starting the VirtualHosts");
+            
+            ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
+            
+            disableManagementStartup(registry.getConfiguration());
+            
+            ApplicationRegistry.remove(1);
+            ApplicationRegistry.initialise(registry);
+
+            checkMessageStores();
+            _initialised = true;
+        }
+        catch (ConfigurationException e)
+        {
+            _logger.error("Unable to load configuration due to configuration error: " + e.getMessage());
+            e.printStackTrace();
+            System.exit(1);
+        }
+        catch (Exception e)
+        {
+            _logger.error("Unable to load configuration due to: " + e.getMessage());
+            e.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    private void disableManagementStartup(ServerConfiguration config)
+    {
+        /*
+         * Update the server config to indicate management is disabled, which
+         * should prompt usage of the NoopManagedObjectRegistry instead of
+         * the JMXManagedObjectRegistry.
+         */
+        config.setManagementEnabled(false);
+        
+        /* 
+         * Set the com.sun.management.jmxremote property (to any value) 
+         * if it isnt already set.
+         * 
+         * The JMXManagedObjectRegistry checks for this system property before
+         * starting its own ConnectorServer programatically as this (usually)
+         * implies that the JVM was started explicitly with properties to enable
+         * its out-of-the-box JMX agent, to which JMXManagedObjectRegistry defers.
+         */
+        if(System.getProperty("com.sun.management.jmxremote") == null)
+        {
+            System.setProperty("com.sun.management.jmxremote", "8999");
+        }
+    }
+
+    private void checkMessageStores()
+    {
+        Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+        for (VirtualHost vhost : vhosts)
+        {
+            if(! (vhost.getMessageStore() instanceof BDBMessageStore) )
+            {
+                _logger.warn("Virtualhost '" + vhost.getName() + "' is not using a BDBMessageStore. "
+                                 + "No changes will be made to it.");
+            }
+        }
+    }
+
+    private static List<AMQShortString> getBDBStoreExchangeNames(final VirtualHost vhost, final BDBMessageStore store)
+    {
+        final List<AMQShortString> exchanges = new ArrayList<AMQShortString>();
+
+        //Create a visitor to visit the Exchange entries and gather the names
+        DatabaseVisitor exchangeVisitor = new DatabaseVisitor()
+        {
+            @SuppressWarnings("unchecked")
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+            {
+                _logger.debug("Visiting new Exchange store entry");
+                
+                TupleBinding binding = new ExchangeTB(vhost);
+                Exchange exchange = (Exchange) binding.entryToObject(value);
+
+                if(exchange != null)
+                {
+                    AMQShortString name = exchange.getName();
+                    if(name != null)
+                    {
+                        _logger.debug("Visited store entry for Exchange: " + name);
+                        exchanges.add(name);
+                    }
+                }
+            }
+        };
+
+        try
+        {
+            store.visitExchanges(exchangeVisitor);
+        }
+        catch (Exception e)
+        {
+            _logger.error("Error retrieving exiting Exchange names from the store: " + e.getMessage());
+            e.printStackTrace();
+        }
+        
+        return exchanges;
+    }
+    
+    private static void addDurableExchangesToStoreIfRequired()
+    {
+        Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+        
+        //For each active VHost, add all durable exchanges in the ExchangeRegistry to the store if they arent already in it.
+        for(VirtualHost vhost : vhosts)
+        {
+            _logger.info("Beginning process for VirtualHost: " + vhost.getName());
+            
+            BDBMessageStore store;
+            
+            if(!(vhost.getMessageStore() instanceof BDBMessageStore) )
+            {
+                _logger.info("Store is not a BDBMessageStore, skipping");
+                continue;
+            }
+            else
+            {
+                store = (BDBMessageStore) vhost.getMessageStore();
+            }
+            
+            List<AMQShortString> bdbExchangeNames = getBDBStoreExchangeNames(vhost, store);
+            
+            ExchangeRegistry registry = vhost.getExchangeRegistry();
+            Collection<AMQShortString> exchangeNames = registry.getExchangeNames();
+            
+            for(AMQShortString exchangeName : exchangeNames)
+            {
+                Exchange exchange = registry.getExchange(exchangeName);
+                if(exchange.isDurable())
+                {
+                    if(bdbExchangeNames.contains(exchangeName))
+                    {
+                        _logger.info("Store already contains entry for Exchange: " + exchangeName);
+                    }
+                    else
+                    {
+                        try
+                        {
+                            _logger.info("Adding store entry for Exchange: " + exchangeName);
+                            store.createExchange(exchange);
+                        }
+                        catch (AMQException e)
+                        {
+                            _logger.error("Error adding entry to store for Exchange '" + exchangeName + "':" + e.getMessage());
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }
+        }
+    }
+    
+}



More information about the rhmessaging-commits mailing list