[rhmessaging-commits] rhmessaging commits: r2368 - in store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb: tuples and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Sep 2 10:30:22 EDT 2008


Author: ritchiem
Date: 2008-09-02 10:30:22 -0400 (Tue, 02 Sep 2008)
New Revision: 2368

Added:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
Removed:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
RHM-5 : Created a version aware tuples to load the two database formats we now have

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-08-29 14:36:01 UTC (rev 2367)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -43,9 +43,13 @@
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.mina.common.TrafficMask;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -73,8 +77,12 @@
 {
     private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
 
-    private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+    private static final int DATABASE_FORMAT_VERSION = 2;
+    private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
 
+
+    public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+
     private Environment _environment;
 
     private static final String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
@@ -123,10 +131,19 @@
 
     private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
 
+    // Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
+
+    private QueueTupleBindingFactory _queueTupleBindingFactory;
+    private BindingTupleBindingFactory _bindingTupleBindingFactory;
+
+    /** The data version this store should run with */
+    private int _version;
+
     private enum State
     {
         INITIAL,
         CONFIGURING,
+        CONFIGURED,
         RECOVERING,
         STARTED,
         CLOSING,
@@ -135,6 +152,16 @@
 
     private State _state = State.INITIAL;
 
+    public BDBMessageStore()
+    {
+        this(DATABASE_FORMAT_VERSION);
+    }
+
+    public BDBMessageStore(int version)
+    {
+        _version = version;
+    }
+
     /**
      * Called after instantiation in order to configure the message store. A particular implementation can define
      * whatever parameters it wants.
@@ -150,10 +177,6 @@
     public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
     {
 
-        stateTransition(State.INITIAL, State.CONFIGURING);
-
-        _log.info("Configuring BDB message store");
-
         File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
         if (!environmentPath.exists())
         {
@@ -164,24 +187,60 @@
             }
         }
 
-        createEnvironment(environmentPath);
-        openDatabases();
-        _virtualHost = virtualHost;
+        _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, 2);
 
-        _commitThread.start();
 
+        configure(virtualHost, environmentPath);
+    }
 
-        upgradeIfNecessary();
+    public void configure(File environmentPath) throws AMQException, DatabaseException
+    {
+        configure(null, environmentPath);
+    }
 
+    public void configure(VirtualHost virtualHost, File environmentPath) throws AMQException, DatabaseException
+    {
+        stateTransition(State.INITIAL, State.CONFIGURING);
+
+        _log.info("Configuring BDB message store");
+
+        if (virtualHost != null)
+        {
+            setVirtualHost(virtualHost);
+            createTupleBindingFactories(_version);
+        }
+
+        setupStore(environmentPath);
+
+        stateTransition(State.CONFIGURING, State.CONFIGURED);
+
         // this recovers durable queues and persistent messages
-        recover();
+        if (virtualHost != null)
+        {
+            recover();
+        }
+    }
 
-        stateTransition(State.RECOVERING, State.STARTED);
+    private void setupStore(File storePath) throws DatabaseException, AMQException
+    {
+        checkState(State.CONFIGURING);
+
+        createEnvironment(storePath);
+
+        openDatabases();
+
+        _commitThread.start();
     }
 
-    private void upgradeIfNecessary()
+    protected void startStore() throws AMQException
     {
+        stateTransition(State.CONFIGURED, State.STARTED);
+    }
 
+    private void createTupleBindingFactories(int version)
+    {
+        _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
+        _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost);
     }
 
     private synchronized void stateTransition(State requiredState, State newState) throws AMQException
@@ -203,7 +262,7 @@
         }
     }
 
-    void createEnvironment(File environmentPath) throws DatabaseException
+    private void createEnvironment(File environmentPath) throws DatabaseException
     {
         _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
         EnvironmentConfig envConfig = new EnvironmentConfig();
@@ -213,7 +272,7 @@
         _environment = new Environment(environmentPath, envConfig);
     }
 
-    protected void openDatabases() throws DatabaseException
+    private void openDatabases() throws DatabaseException
     {
         DatabaseConfig dbConfig = new DatabaseConfig();
         dbConfig.setTransactional(true);
@@ -493,16 +552,15 @@
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
 
-            BindingTB binding = new BindingTB(_virtualHost);
-
             BindingKey queueBinding =
                     new BindingKey(exchange.getName(), null, null, null);
 
-            EntryBinding keyBinding = new BindingTB(_virtualHost);
+            EntryBinding<BindingKey> keyBinding = _bindingTupleBindingFactory.getInstance();
             keyBinding.objectToEntry(queueBinding, key);
 
             OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
 
+            TupleBinding binding = _bindingTupleBindingFactory.getInstance();
             while (opStatus == OperationStatus.SUCCESS)
             {
                 queueBinding = (BindingKey) binding.entryToObject(key);
@@ -538,7 +596,7 @@
             cursor = _exchangeDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
-            ExchangeTB binding = new ExchangeTB(_virtualHost);
+            TupleBinding binding = new ExchangeTB(_virtualHost);
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
                 Exchange exchange = (Exchange) binding.entryToObject(value);
@@ -579,8 +637,9 @@
         if (_state != State.RECOVERING)
         {
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new BindingTB(_virtualHost);
+            EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
             keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+
             DatabaseEntry value = new DatabaseEntry();
             ByteBinding.byteToEntry((byte) 0, value);
 
@@ -610,7 +669,7 @@
             throws AMQException
     {
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new BindingTB(_virtualHost);
+        EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
         keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
 
         try
@@ -652,10 +711,15 @@
             _queueNameToIdMap.put(queue.getName(), queueId);
 
             DatabaseEntry key = new DatabaseEntry();
+
             EntryBinding keyBinding = new AMQShortStringTB();
             keyBinding.objectToEntry(queue.getName(), key);
+
             DatabaseEntry value = new DatabaseEntry();
-            TupleBinding queueBinding = new QueueTB(_virtualHost, arguments);
+            TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+
+            ((QueueTuple) queueBinding).setArguments(arguments);
+
             queueBinding.objectToEntry(queue, value);
             try
             {
@@ -718,7 +782,7 @@
         try
         {
             _queueDb.get(null, key, value, LockMode.RMW);
-            QueueTB binding = new QueueTB(_virtualHost, null);
+            TupleBinding binding = _queueTupleBindingFactory.getInstance();
 
             return (AMQQueue) binding.entryToObject(value);
         }
@@ -1038,7 +1102,7 @@
 
     public void recover() throws AMQException
     {
-        stateTransition(State.CONFIGURING, State.RECOVERING);
+        stateTransition(State.CONFIGURED, State.RECOVERING);
 
         _log.info("Recovering persistent state...");
         StoreContext context = new StoreContext();
@@ -1057,8 +1121,16 @@
         catch (DatabaseException e)
         {
             abortTran(context);
+
             throw new AMQException("Error recovering persistent state: " + e, e);
         }
+        catch (Throwable ioobe)
+        {
+            abortTran(context);
+            throw new AMQException("Invalid database format. Please use upgrade tool.", ioobe);
+        }
+
+        stateTransition(State.RECOVERING, State.STARTED);
     }
 
     /**
@@ -1254,14 +1326,17 @@
             cursor = _queueDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
-            QueueTB binding = new QueueTB(_virtualHost, null);
+            TupleBinding binding = _queueTupleBindingFactory.getInstance();
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
                 AMQQueue queue = (AMQQueue) binding.entryToObject(value);
-                _virtualHost.getQueueRegistry().registerQueue(queue);
-                queues.put(queue.getName(), queue);
-                _log.info("Recovering queue " + queue.getName() + " with owner:"
-                          + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+                if (queue != null)
+                {
+                    _virtualHost.getQueueRegistry().registerQueue(queue);
+                    queues.put(queue.getName(), queue);
+                    _log.info("Recovering queue " + queue.getName() + " with owner:"
+                              + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+                }
             }
 
             return queues;
@@ -1275,6 +1350,117 @@
         }
     }
 
+    //public getters for the TupleBindingFactories
+
+    public QueueTupleBindingFactory getQueueTupleBindingFactory()
+    {
+        return _queueTupleBindingFactory;
+    }
+
+    public BindingTupleBindingFactory getBindingTupleBindingFactory()
+    {
+        return _bindingTupleBindingFactory;
+    }
+
+    //Package getters for the various databases used by the Store
+
+    Database getMetaDataDb()
+    {
+        return _messageMetaDataDb;
+    }
+
+    Database getContentDb()
+    {
+        return _messageContentDb;
+    }
+
+    Database getQueuesDb()
+    {
+        return _queueDb;
+    }
+
+    Database getDeliveryDb()
+    {
+        return _deliveryDb;
+    }
+
+    Database getExchangesDb()
+    {
+        return _exchangeDb;
+    }
+
+    Database getBindingsDb()
+    {
+        return _queueBindingsDb;
+    }
+
+    /** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
+    interface DatabaseVisitor
+    {
+        public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException;
+    }
+
+    void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQException
+    {
+        visitDatabase(_messageMetaDataDb, visitor);
+    }
+
+    void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQException
+    {
+        visitDatabase(_messageContentDb, visitor);
+    }
+
+    void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQException
+    {
+        visitDatabase(_queueDb, visitor);
+    }
+
+    void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQException
+    {
+        visitDatabase(_deliveryDb, visitor);
+    }
+
+    void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQException
+    {
+        visitDatabase(_exchangeDb, visitor);
+    }
+
+    void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQException
+    {
+        visitDatabase(_queueBindingsDb, visitor);
+    }
+
+    /**
+     * Generic visitDatabase allows iteration through the specified database.
+     *
+     * @param database The database to visit
+     * @param visitor  The visitor to give each entry to.
+     *
+     * @throws DatabaseException If there is a problem with the Database structure
+     * @throws AMQException      If there is a programming error
+     */
+    void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQException
+    {
+        Cursor cursor = database.openCursor(null, null);
+
+        try
+        {
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                visitor.visit(key, value);
+            }
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+    }
+
     private static final class ProcessAction
     {
         private final AMQQueue _queue;
@@ -1402,6 +1588,8 @@
     void setVirtualHost(VirtualHost virtualHost)
     {
         _virtualHost = virtualHost;
+
+        createTupleBindingFactories(_version);
     }
 
     void commit(Transaction tx) throws DatabaseException
@@ -1623,4 +1811,5 @@
             }
         }
     }
+
 }

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java	2008-08-29 14:36:01 UTC (rev 2367)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -1,59 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
-
-public class BindingTB extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(BindingTB.class);
-
-    private final VirtualHost _virtualHost;
-
-    public BindingTB(VirtualHost virtualHost)
-    {
-        _virtualHost = virtualHost;
-    }
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        try
-        {
-            AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
-            AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
-            AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
-            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
-            return new BindingKey(exchangeName, queueName, routingKey, arguments);
-        }
-        catch (DatabaseException e)
-        {
-            _log.error("Unable to create binding: " + e, e);
-            return null;
-        }
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        BindingKey binding = (BindingKey) object;
-
-        AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
-        FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
-
-        binding = (BindingKey) entryToObject(new TupleInput(tupleOutput.getBufferBytes()));
-
-        System.err.println(binding.getExchangeName());
-        System.err.println(binding.getQueueName());
-        System.err.println(binding.getRoutingKey());
-        System.err.println(binding.getArguments());
-
-    }
-}

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java	2008-08-29 14:36:01 UTC (rev 2367)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -1,79 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.log4j.Logger;
-
-public class QueueTB extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(QueueTB.class);
-
-    private final VirtualHost _virtualHost;
-    private final FieldTable _arguments;
-
-    public QueueTB(VirtualHost virtualHost, FieldTable arguments)
-    {
-        _virtualHost = virtualHost;
-        _arguments = arguments;
-    }
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        try
-        {
-            AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
-            AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
-            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
-            try
-            {
-                return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
-            }
-            catch (AMQException e)
-            {
-                _log.error("Unable to create queue: " + e, e);
-                return null;
-            }
-        }
-        catch (DatabaseException e)
-        {
-            _log.error("Unable to create binding: " + e, e);
-            return null;
-        }
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        AMQQueue queue = (AMQQueue) object;
-
-        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
-        FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
-
-    }
-}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.FieldTable;
+
+public interface BindingTuple
+{
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BindingTupleBindingFactory extends TupleBindingFactory
+{
+    public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+    {
+        super(version, virtualhost);
+    }
+
+    public TupleBinding getInstance()
+    {
+        switch (_version)
+        {
+            default:
+            case 2:
+                return new BindingTuple_2(_virtualhost);
+            case 1:
+                return new BindingTuple_1(_virtualhost);
+        }
+    }
+}

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java (from rev 2292, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,60 @@
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+
+public class BindingTuple_1 extends TupleBinding implements BindingTuple
+{
+    protected static final Logger _log = Logger.getLogger(BindingTuple.class);
+
+    protected VirtualHost _virtualhost;
+
+    public BindingTuple_1(VirtualHost virtualHost)
+    {
+        if (virtualHost == null)
+        {
+            throw new NullPointerException("Virtualhost cannot be null");
+        }
+        _virtualhost = virtualHost;
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+
+        return createNewBindingKey(exchangeName, queueName, routingKey);
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        BindingKey binding = (BindingKey) object;
+
+        AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+    }
+
+    private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
+    {
+        return createNewBindingKey(exchangeName, queueName, routingKey, null);
+    }
+
+    // Addition for Version 2 of this table
+    protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
+                                         AMQShortString routingKey, FieldTable arguments)
+    {
+        return new BindingKey(exchangeName, queueName, routingKey, arguments);
+    }
+
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,58 @@
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+import org.apache.log4j.Logger;
+
+public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
+{
+
+    public BindingTuple_2(VirtualHost virtualHost)
+    {
+        super(virtualHost);
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+
+        FieldTable arguments;
+
+        // Addition for Version 2 of this table
+        try
+        {
+            arguments = FieldTableEncoding.readFieldTable(tupleInput);
+        }
+        catch (DatabaseException e)
+        {
+            _log.error("Unable to create binding: " + e, e);
+            return null;
+        }
+
+        return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        BindingKey binding = (BindingKey) object;
+        
+        AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+
+        // Addition for Version 2 of this table
+        FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
+    }
+
+}
\ No newline at end of file

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.FieldTable;
+
+public interface QueueTuple
+{
+    // Addition for Version 2 
+    public void setArguments(FieldTable arguments);
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class QueueTupleBindingFactory extends TupleBindingFactory
+{
+    public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+    {
+        super(version,virtualHost);
+    }
+
+    public TupleBinding getInstance()
+    {
+        switch (_version)
+        {
+            default:
+            case 2:
+                return new QueueTuple_2(_virtualhost);
+            case 1:
+                return new QueueTuple_1(_virtualhost);
+        }
+    }
+}

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java (from rev 2292, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.log4j.Logger;
+
+public class QueueTuple_1 extends TupleBinding implements QueueTuple
+{
+    protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
+
+    protected final VirtualHost _virtualHost;
+
+    public QueueTuple_1(VirtualHost virtualHost)
+    {
+        if (virtualHost == null)
+        {
+            throw new NullPointerException("Virtualhost cannot be null");
+        }
+        _virtualHost = virtualHost;
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+
+        return createNewQueue(name, owner);
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        AMQQueue queue = (AMQQueue) object;
+
+        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+    }
+
+    // Addition for Version 2 of this table
+    public void setArguments(FieldTable arguments)
+    {
+        //no-op
+    }
+
+    protected Object createNewQueue(AMQShortString name, AMQShortString owner)
+    {
+        return createNewQueue(name, owner, null);
+    }
+
+    // Addition for Version 2 of this table
+    protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
+    {
+        try
+        {
+            return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+        }
+        catch (AMQException e)
+        {
+            _logger.error("Unable to create queue: " + e, e);
+            return null;
+        }
+    }
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
___________________________________________________________________
Name: svn:eol-style
   + native

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_2 extends QueueTuple_1
+{
+    protected FieldTable _arguments;
+
+    public QueueTuple_2(VirtualHost virtualHost)
+    {
+        super(virtualHost);
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+            AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+            // Addition for Version 2 of this table
+            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+            return createNewQueue(name, owner, arguments);
+        }
+        catch (DatabaseException e)
+        {
+            _logger.error("Unable to create binding: " + e, e);
+            return null;
+        }
+
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        AMQQueue queue = (AMQQueue) object;
+
+        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+        // Addition for Version 2 of this table
+        FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
+    }
+
+    // Addition for Version 2 of this table
+    public void setArguments(FieldTable arguments)
+    {
+        _arguments = arguments;
+    }
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java	2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public abstract class TupleBindingFactory
+{
+    protected int _version;
+
+    protected VirtualHost _virtualhost;
+
+    public TupleBindingFactory(int version, VirtualHost virtualhost)
+    {
+        if (virtualhost == null)
+        {
+            throw new NullPointerException("Virtualhost cannot be null");
+        }
+        
+        _version = version;
+        _virtualhost = virtualhost;        
+    }
+
+    public abstract TupleBinding getInstance();
+}




More information about the rhmessaging-commits mailing list