[rhmessaging-commits] rhmessaging commits: r2369 - in store/branches/java/broker-queue-refactor/java/bdbstore: src and 10 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Sep 2 10:31:35 EDT 2008


Author: ritchiem
Date: 2008-09-02 10:31:35 -0400 (Tue, 02 Sep 2008)
New Revision: 2369

Added:
   store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml
   store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access
   store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
RHM-6 :Create BDBStore Upgrade Tool, provided additional test to validate its functionality.

Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements.  See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<broker>
+    <prefix>${QPID_HOME}</prefix>
+    <work>${QPID_WORK}</work>
+    <conf>${prefix}/etc</conf>
+    <connector>
+        <!-- Uncomment out this block and edit the keystorePath and keystorePassword
+             to enable SSL support
+        <ssl>
+            <enabled>true</enabled>
+            <sslOnly>true</sslOnly>
+            <keystorePath>/path/to/keystore.ks</keystorePath>
+            <keystorePassword>keystorepass</keystorePassword>
+        </ssl>-->
+        <qpidnio>false</qpidnio>
+        <protectio>
+            <enabled>false</enabled>
+        </protectio>
+        <transport>nio</transport>
+        <port>5672</port>
+        <sslport>8672</sslport>
+        <socketReceiveBuffer>32768</socketReceiveBuffer>
+        <socketSendBuffer>32768</socketSendBuffer>
+    </connector>
+    <management>
+        <enabled>true</enabled>
+        <jmxport>8999</jmxport>
+        <security-enabled>false</security-enabled>
+    </management>
+    <advanced>
+        <filterchain enableExecutorPool="true"/>
+        <enablePooledAllocator>false</enablePooledAllocator>
+        <enableDirectBuffers>false</enableDirectBuffers>
+        <framesize>65535</framesize>
+        <compressBufferOnQueue>false</compressBufferOnQueue>
+        <enableJMSXUserID>false</enableJMSXUserID>
+    </advanced>
+
+    <security>
+        <principal-databases>
+            <!-- Example use of Base64 encoded MD5 hashes for authentication via CRAM-MD5-Hashed -->
+            <principal-database>
+                <name>passwordfile</name>
+                <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+                <attributes>
+                    <attribute>
+                        <name>passwordFile</name>
+                        <value>${conf}/passwd</value>
+                    </attribute>
+                </attributes>
+            </principal-database>
+        </principal-databases>
+
+        <access>
+            <class>org.apache.qpid.server.security.access.plugins.AllowAll</class>
+        </access>
+        <jmx>
+            <access>${conf}/jmxremote.access</access>
+            <principal-database>passwordfile</principal-database>
+        </jmx>
+    </security>
+
+    <virtualhosts>
+        <directory>${conf}/virtualhosts</directory>
+
+        <virtualhost>
+            <name>localhost</name>
+            <localhost>
+                <store>
+                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+                </store>
+
+                <housekeeping>
+                    <expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
+                </housekeeping>
+
+            </localhost>
+        </virtualhost>
+
+        <virtualhost>
+            <name>development</name>
+            <development>
+                <store>
+                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+                </store>
+            </development>
+        </virtualhost>
+
+        <virtualhost>
+            <name>test</name>
+            <test>
+                <store>
+                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+                </store>
+            </test>
+        </virtualhost>
+
+    </virtualhosts>
+    <heartbeat>
+        <delay>0</delay>
+        <timeoutFactor>2.0</timeoutFactor>
+    </heartbeat>
+    <queue>
+        <auto_register>true</auto_register>
+    </queue>
+
+    <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
+
+

Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#Generated by JMX Console : Last edited by user:admin
+#Tue Jun 12 16:46:39 BST 2007
+admin=admin
+guest=readonly
+user=readwrite

Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+guest:guest
+client:guest
+server:guest
+

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-09-02 14:30:22 UTC (rev 2368)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-09-02 14:31:35 UTC (rev 2369)
@@ -69,7 +69,6 @@
     private static volatile int _loops;
     private String TEST_LOCATION = "bdbTestEnv";
     File BDB_DIR = new File(TEST_LOCATION);
-    
 
     public void setUp() throws Exception
     {
@@ -83,13 +82,13 @@
         BDB_DIR.mkdirs();
 
         _store = new BDBMessageStore();
+        _store.configure(BDB_DIR);
 
-        _store.createEnvironment(BDB_DIR);
-        _store.openDatabases();
         _virtualHost = new VirtualHost("test", _store);
         _store.setVirtualHost(_virtualHost);
-        _store.startCommitThread();
 
+        _store.recover();
+
         _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
     }
 
@@ -111,24 +110,24 @@
     private void reload() throws Exception
     {
         _virtualHost.close();
-        
+
         PropertiesConfiguration env = new PropertiesConfiguration();
 
         env.addProperty("store.environment-path", "bdbTestEnv");
         env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
 
         _virtualHost = new VirtualHost("test", env);
-        _store = (BDBMessageStore)_virtualHost.getMessageStore();
+        _store = (BDBMessageStore) _virtualHost.getMessageStore();
     }
 
     public void tearDown() throws Exception
     {
         _virtualHost.close();
 
-        ApplicationRegistry.removeAll();
+        ApplicationRegistry.remove(1);
     }
 
-    public void     testExchangePersistence() throws Exception
+    public void testExchangePersistence() throws Exception
     {
         FieldTable queueArguments = new FieldTable();
         Integer priorityLevel = 5;
@@ -332,7 +331,7 @@
         {
             // pass since exception expected
         }
-        
+
     }
 
     public void testTranCommit() throws Exception
@@ -348,7 +347,7 @@
         _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue, null);
+        _store.createQueue(queue);
 
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 20L);
@@ -381,7 +380,7 @@
         _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue, null);
+        _store.createQueue(queue);
 
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 30L);
@@ -395,17 +394,16 @@
         _store.beginTran(_storeContext);
         _store.commitTran(_storeContext);
 
-                enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
         assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         assertEquals("First Message is incorrect", 30L, val.longValue());
         val = enqueuedIds.get(1);
         assertEquals("Second Message is incorrect", 31L, val.longValue());
-        
+
     }
 
-
     public void testTranRollback2() throws Exception
     {
         List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -423,7 +421,7 @@
         _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue, null);
+        _store.createQueue(queue);
 
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 30L);
@@ -461,8 +459,8 @@
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
 
-        _store.createQueue(queue, null);
-        _store.createQueue(queue2, null);
+        _store.createQueue(queue);
+        _store.createQueue(queue2);
 
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 40L);
@@ -504,7 +502,7 @@
         _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue, null);
+        _store.createQueue(queue);
 
         _store.enqueueMessage(_storeContext, queue, 50L);
         _store.dequeueMessage(_storeContext, queue, 50L);
@@ -513,7 +511,7 @@
     public void testQueueRemove() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue, null);
+        _store.createQueue(queue);
         _store.removeQueue(queue);
         try
         {

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
+import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.util.FileUtils;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import junit.framework.TestCase;
+
+import java.io.File;
+import java.io.IOException;
+
+public class BDBUpgradeTest extends TestCase
+{
+    protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
+
+    final String BDBHome = System.getProperty("BDB_HOME");
+    final File _configFile = new File(BDBHome, "etc/config.xml");
+
+    private String VIRTUALHOST = "test";
+
+    private static final String VERSION_1 = "1";
+    private static final String VERSION_2 = "2";
+
+    private String _topic = "MyDurableSubscriptionTestTopic";
+
+    String _fromDir = System.getProperty("QPID_WORK")+"/version1Store";
+    String _toDir = System.getProperty("QPID_WORK")+"/version2Store";
+
+
+    public void setUp() throws IOException
+    {
+        assertNotNull("QPID_WORK must be set", System.getProperty("QPID_WORK"));
+
+        File directory = new File(_toDir);
+
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }        
+
+        directory = new File(_fromDir);
+
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
+
+    }
+
+
+    public void testDurababilitySelectors() throws Exception
+    {
+        String broker = "vm://:1";
+
+        startBroker(1, VERSION_1);
+
+        new DurableSubscriber(broker, _topic, null).close();
+
+        sendMessages(broker, _topic, 10);
+
+        sendAndCheckDurableSubscriber(broker, false, true, 5, "odd=true");
+
+        Thread.sleep(2000);
+
+        stopBroker(1);
+
+        upgradeBroker();
+
+        broker = "vm://:2";
+        
+        startBroker(2, VERSION_2);
+
+        //Ensure msg were transitioned to new broker
+        sendAndCheckDurableSubscriber(broker, false, false, 5, null);
+
+        //Reset the Selector Pattern
+        new DurableSubscriber(broker, _topic, "odd=true").close();
+
+        stopBroker(2);
+
+        startBroker(2, VERSION_2);
+
+        //Ensure that the selector was preseved on restart and caused all msgs to be removed.
+        sendAndCheckDurableSubscriber(broker, false, false, 0, null);
+        stopBroker(2);           
+    }
+
+    public void testDurabability() throws Exception
+    {
+        String broker = "vm://:1";
+
+        startBroker(1, VERSION_1);
+
+        sendAndCheckDurableSubscriber(broker, true, false, 10, null);
+
+        stopBroker(1);
+
+        startBroker(1, VERSION_1);
+
+        sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+
+        stopBroker(1);
+    }
+
+    private void upgradeBroker() throws Exception
+    {
+        new BDBStoreUpgrade(_fromDir, _toDir).upgradeFromVersion(1);
+    }
+
+    private void stopBroker(int port)
+    {
+        TransportConnection.killVMBroker(port);
+        ApplicationRegistry.remove(port);
+    }
+
+    private void startBroker(int port, String version) throws Exception
+    {
+        //Create a Version 1 store
+        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(_configFile);
+
+        //Disable management on broker.
+        config.getConfiguration().setProperty("management.enabled", "false");
+
+        Configuration testVirtualhost = config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST);
+        testVirtualhost.setProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+        testVirtualhost.setProperty("store." + BDBMessageStore.ENVIRONMENT_PATH_PROPERTY, "${work}/version" + port + "Store");
+        testVirtualhost.setProperty("store.version", version);
+
+        ApplicationRegistry.initialise(config, port);
+        TransportConnection.createVMBroker(port);
+
+    }
+
+    public void sendAndCheckDurableSubscriber(String broker, boolean send, boolean commitRecieved, int count, String selector)
+            throws Exception
+    {
+
+        DurableSubscriber subscriber = new DurableSubscriber(broker, _topic, selector);
+
+        try
+        {
+            if (send)
+            {
+                sendMessages(broker, _topic, count);
+            }
+
+            //Wait for messages to arrive.
+            // 1sec should be enough for delivery of 10 msgs.
+            int loop = 0;
+
+            Thread.sleep(1000);
+
+            while (subscriber.getMessages().size() < count)
+            {
+                assertTrue("Waited more than 5 second for " + count + "messages to be delivered:"
+                           + subscriber.getMessages().size(), loop < 5);
+                loop++;
+                Thread.sleep(1000);
+            }
+
+            assertEquals("Incorrect number of messages received", count, subscriber.getMessages().size());
+
+            if (commitRecieved)
+            {
+                subscriber.commit();
+            }
+        }
+        finally
+        {
+            subscriber.close();
+        }
+    }
+
+    private void sendMessages(String broker, String topic, int totalMessages) throws JMSException
+    {
+
+        Publisher publisher = new Publisher(broker, topic);
+
+        for (int i = 0; i < totalMessages; i++)
+        {
+            Message msg = publisher.createTextMessage("message " + i);
+            msg.setBooleanProperty("odd", i % 2 == 0);
+            publisher.send(msg);
+        }
+
+        publisher.commit();
+    }
+
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.utils;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import java.util.List;
+import java.util.LinkedList;
+
+public class DurableSubscriber implements MessageListener
+{
+    private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriber.class);
+
+    JNDIHelper _jndiHelper;
+
+    Session _session;
+    TopicSubscriber _subscriber;
+    AMQConnection _connection;
+    private List<Message> _received;
+
+    public static void main(String[] args) throws JMSException
+    {
+        new Publisher();
+    }
+
+    public DurableSubscriber() throws JMSException
+    {
+        this(JNDIHelper.DEFAULT_BROKER, null);
+    }
+
+    public DurableSubscriber(String broker, String topic) throws JMSException
+    {
+        this(broker, topic, null);
+    }
+
+    public DurableSubscriber(String broker, String topicStr, String selector) throws JMSException
+    {
+        _jndiHelper = new JNDIHelper(broker );
+
+        _connection = (AMQConnection) ((ConnectionFactory) _jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
+
+        _jndiHelper.close();
+
+        AMQTopic topic = new AMQTopic(_connection, topicStr);
+
+        _logger.debug("Create Session");
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        _logger.debug("Create Durable Subscriber on Session");
+
+        if (selector != null)
+        {
+            _subscriber = _session.createDurableSubscriber(topic, "MySubscription", selector, false);
+        }
+        else
+        {
+            _subscriber = _session.createDurableSubscriber(topic, "MySubscription");
+        }
+
+        _received = new LinkedList<Message>();
+
+        _subscriber.setMessageListener(this);       
+
+        _connection.start();
+    }
+
+    public void close() throws JMSException
+    {
+        _connection.close();
+    }
+
+    public void onMessage(Message message)
+    {
+        _received.add(message);
+    }
+
+    public List<Message> getMessages()
+    {
+        return _received;
+    }
+
+    public void commit() throws JMSException
+    {
+        _session.commit();
+    }
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.utils;
+
+import javax.naming.NamingException;
+import javax.naming.InitialContext;
+import javax.naming.Context;
+import java.util.Properties;
+
+public class JNDIHelper
+{
+    public static final String DEFAULT_BROKER = "tcp://localhost:2345";
+    public static final String CONNECTION_JNDI_NAME = "local";
+
+    public final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+    public final String CONNECTION_NAME;
+
+    InitialContext _ctx;
+
+    public JNDIHelper(String broker) 
+    {
+        CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + broker + "'";
+        setupJNDI();
+    }
+
+    /**
+     * Lookup the specified name in the JNDI Context.
+     *
+     * @param name The string name of the object to lookup
+     *
+     * @return The object or null if nothing exists for specified name
+     */
+    public Object lookupJNDI(String name)
+    {
+        try
+        {
+            return _ctx.lookup(name);
+        }
+        catch (NamingException e)
+        {
+            System.err.println("Error looking up '" + name + "' in JNDI Context:" + e);
+        }
+
+        return null;
+    }
+
+     /**
+     * Setup the JNDI context.
+     *
+     * In this case we are simply using a Properties object to store the pairing information.
+     *
+     * Further details can be found on the wiki site here:
+     *
+     * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html
+     */
+    private void setupJNDI()
+    {
+        // Set the properties ...
+        Properties properties = new Properties();
+        properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
+        properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME);
+
+        // Create the initial context
+        Context ctx = null;
+        try
+        {
+            _ctx = new InitialContext(properties);
+        }
+        catch (NamingException e)
+        {
+            System.err.println("Error Setting up JNDI Context:" + e);
+        }
+    }
+
+    /** Close the JNDI Context to keep everything happy. */
+    public void close()
+    {
+        try
+        {
+            _ctx.close();
+        }
+        catch (NamingException e)
+        {
+            System.err.println("Unable to close JNDI Context : " + e);
+        }
+    }
+}
+
+

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.utils;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.util.List;
+import java.util.LinkedList;
+
+public class Publisher
+{
+    private static final Logger _logger = LoggerFactory.getLogger(Publisher.class);
+
+    JNDIHelper _jndiHelper;
+
+    Session _session;
+
+    MessageProducer _publisher;
+
+    AMQConnection _connection;
+
+    public static void main(String[] args) throws JMSException
+    {
+        new Publisher();
+    }
+
+    public Publisher() throws JMSException
+    {
+        this(JNDIHelper.DEFAULT_BROKER, null);
+    }
+
+    public Publisher(String broker, String topicStr) throws JMSException
+    {
+        _jndiHelper= new JNDIHelper(broker);
+
+        _connection = (AMQConnection) ((ConnectionFactory) _jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
+
+        _jndiHelper.close();
+
+        AMQTopic topic = new AMQTopic(_connection, topicStr);
+
+        _logger.debug("Create Session");
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        _logger.debug("Create publisher on Session");
+
+        if (topicStr != null)
+        {
+            _publisher = _session.createProducer(_session.createTopic(topicStr));
+        }
+        else
+        {
+            _publisher = _session.createProducer(null);
+        }
+    }
+
+    public void close() throws JMSException
+    {
+        _connection.close();
+    }
+
+    public void commit() throws JMSException
+    {
+        _session.commit();
+    }
+
+    public Message createTextMessage(String msg) throws JMSException
+    {
+        return _session.createTextMessage(msg);
+    }
+
+    public void send(Message msg) throws JMSException
+    {
+        _publisher.send( msg);
+    }
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2008-09-02 14:31:35 UTC (rev 2369)
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.AMQException;
+
+import java.io.File;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Database;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.ByteBinding;
+
+/**
+ * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V1 Store to a V2 Store.
+ *
+ * NOTE: No checks are in place to validate that the input is V1.
+ *
+ * Currently upgrade is fixed from v1 -> v2
+ * Only the Queue and Binding databases are migrated all other databases are copied as DB entries.
+ *
+ * Improvments:
+ * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
+ * - Add a version value into the store so that a quick check can be performed to perform the upgrades.
+ * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
+ * - Currently only the Queue and Binding DB are processed for upgrade all the other db data is copied between stores.
+ * - Add process logging and disable all Store and Qpid logging.
+ */
+public class BDBStoreUpgrade
+{
+    private static final String FROM_DIR = "-input";
+    private static final String TO_DIR = "-output";
+
+    private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class);
+    /** The Store Directory that needs upgrading */
+    File _fromDir;
+    /** The Directory that will be made to contain the upgraded store */
+    File _toDir;
+
+    /** The Old Store */
+    BDBMessageStore _oldMessageStore;
+    /** The New Store */
+    BDBMessageStore _newMessageStore;
+    /** A VHost used in the migration of the queues from the old VHost */
+    VirtualHost _newVirtualHost;
+    /** A VHost used in the creation of the queues from the old store */
+    VirtualHost _oldVirtualHost;
+
+    public BDBStoreUpgrade(String fromDir, String toDir)
+    {
+        _fromDir = new File(fromDir);
+        if (!_fromDir.exists())
+        {
+            if (!_fromDir.mkdirs())
+            {
+                throw new IllegalArgumentException("BDBStore path " + fromDir + " could not be read. "
+                                                   + "Ensure the path is correct and that the permissions are correct.");
+            }
+        }
+
+        _toDir = new File(toDir);
+        if (!_toDir.exists())
+        {
+            if (!_toDir.mkdirs())
+            {
+                throw new IllegalArgumentException("Environment path " + toDir + " could not be created. "
+                                                   + "Ensure the path is correct and that the permissions are correct.");
+            }
+        }
+        else
+        {
+            throw new IllegalArgumentException("Environment path " + toDir + " already exists. ");
+
+        }
+
+    }
+
+    /**
+     * Upgrade a Store of a specified version to the latest version.
+     * @param version the version of the current store
+     * @throws Exception
+     */
+    public void upgradeFromVersion(int version) throws Exception
+    {
+        upgradeFromVersion(version, _fromDir, _toDir);
+    }
+
+    /**
+     * Upgrade a Store of a specified version to the latest version.
+     *
+     * @param version the version of the current store
+     * @param fromDir the directory with the old Store
+     * @param toDir the directrory to hold the newly Upgraded Store
+     *
+     * @throws Exception  due to Virtualhost/MessageStore.close() being rather poor at exception handling
+     * @throws DatabaseException if there is a problem with the store formats
+     * @throws AMQException if there is an issue creating Qpid data structures
+     */
+    public void upgradeFromVersion(int version, File fromDir, File toDir) throws Exception
+    {
+
+        // Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
+        _newVirtualHost = new VirtualHost("Upgraded", new MemoryMessageStore());
+        _oldVirtualHost = new VirtualHost("Old", new MemoryMessageStore());
+
+        //Create a new messageStore
+        _newMessageStore = new BDBMessageStore();
+        _newMessageStore.configure(_newVirtualHost, toDir);
+
+        //Load the old MessageStore
+        switch (version)
+        {
+            default:
+            case 1:
+                _oldMessageStore = new BDBMessageStore(1);
+                _oldMessageStore.configure(_oldVirtualHost, fromDir);
+                upgradeFromVersion_1();
+                break;
+        }
+
+        _oldMessageStore.close();
+        _newMessageStore.close();
+
+        _newVirtualHost.close();
+        _oldVirtualHost.close();
+        
+        //Shutdown the AR that the Vhosts will have created.
+        ApplicationRegistry.remove(1);
+    }
+
+    private void upgradeFromVersion_1() throws AMQException, DatabaseException
+    {
+        //Migrate _messageMetaDataDb;
+        moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb());
+
+        //Migrate _messageContentDb;
+        moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb());
+
+        //Migrate _queueDb;
+        //Get the oldMessageStore Tuple Binding which does the parsing
+        final TupleBinding queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
+        _oldMessageStore.visitQueues(new BDBMessageStore.DatabaseVisitor()
+        {
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException
+            {
+                AMQQueue queue = (AMQQueue) queueTupleBinding.entryToObject(value);
+
+                _newMessageStore.createQueue(queue);
+                //
+                _newMessageStore.getQueueRegistry().registerQueue(queue);
+                //or
+                //queue.stop();
+                // As the queue will create a thread pool message delivery that needs to be shutdown
+                // for the broker to cleanly stop.
+            }
+        });
+
+        //Migrate _deliveryDb;
+        moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb());
+
+        //Migrate _exchangeDb;
+        moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb());
+
+        //Migrate _queueBindingsDb;
+        final TupleBinding bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
+        _oldMessageStore.visitBindings(new BDBMessageStore.DatabaseVisitor()
+        {
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException, DatabaseException
+            {
+                BindingKey queueBinding = (BindingKey) bindingTupleBinding.entryToObject(key);
+
+                TupleBinding newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
+
+                DatabaseEntry newKey = new DatabaseEntry();
+                newBindingTupleBinding.objectToEntry(queueBinding, newKey);
+
+                ByteBinding.byteToEntry((byte) 0, value);
+                _newMessageStore.getBindingsDb().put(null, newKey, value);
+            }
+        });
+
+    }
+
+    private void moveContents(Database oldDatabase, final Database newDatabase) throws AMQException, DatabaseException
+    {
+        _oldMessageStore.visitDatabase(oldDatabase, new BDBMessageStore.DatabaseVisitor()
+        {
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException, DatabaseException
+            {
+                newDatabase.put(null, key, value);
+            }
+        });
+
+    }
+
+    private static void usage()
+    {
+        System.out.println("usage: -fromDir <path-to-store-to-upgrade> -toDir <path-to-location-to-write-upgraded-db>");
+    }
+
+    public static void main(String[] args) throws Exception
+    {
+        String fromDir, toDir;
+
+        if (args.length < 4 ||
+            (!args[0].equals(FROM_DIR) && !args[2].equals(TO_DIR)))
+        {
+            usage();
+            System.exit(1);
+        }
+
+        fromDir = args[1];
+        toDir = args[3];
+
+        new BDBStoreUpgrade(fromDir, toDir).upgradeFromVersion(1);
+
+        System.out.println("Upgrade complete");
+    }
+}




More information about the rhmessaging-commits mailing list