[rhmessaging-commits] rhmessaging commits: r3117 - in store/branches/java/broker-queue-refactor/java/bdbstore/src: test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Feb 13 06:41:24 EST 2009


Author: ritchiem
Date: 2009-02-13 06:41:24 -0500 (Fri, 13 Feb 2009)
New Revision: 3117

Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
Updates based on QPID-1628, QPID-1629 that remove the MessageHandles and the factory

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-02-11 13:45:15 UTC (rev 3116)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-02-13 11:41:24 UTC (rev 3117)
@@ -23,13 +23,20 @@
 import com.sleepycat.bind.EntryBinding;
 import com.sleepycat.bind.tuple.ByteBinding;
 import com.sleepycat.bind.tuple.TupleBinding;
-
-import com.sleepycat.je.*;
-
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
 import org.apache.commons.configuration.Configuration;
-
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -37,15 +44,17 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.PersistentAMQMessage;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.TransientAMQMessage;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
 import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -58,8 +67,8 @@
 import java.util.Map;
 import java.util.Queue;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -126,8 +135,11 @@
     private QueueTupleBindingFactory _queueTupleBindingFactory;
     private BindingTupleBindingFactory _bindingTupleBindingFactory;
 
+    MessageFactory _messageFactory;
+
     /** The data version this store should run with */
     private int _version;
+
     private enum State
     {
         INITIAL,
@@ -220,6 +232,8 @@
 
         boolean newEnvironment = setupStore(environmentPath, readonly);
 
+        _messageFactory = MessageFactory.getInstance();
+
         // Performing recovery when we only want read access will cause all the broker objects to be recreated
         // This will/may include thread pool creations that may be duplicated when manually inspecting the state of
         // the store. Simplest solution is to prevent the initial creation of the state by blocking recovery.
@@ -340,10 +354,10 @@
         envConfig.setTransactional(true);
         envConfig.setConfigParam("je.lock.nLockTables", "7");
         envConfig.setLockTimeout(15000);
-        
+
         // Set transaction mode
         _transactionConfig.setReadCommitted(true);
-        
+
         //This prevents background threads running which will potentially update the store.
         envConfig.setReadOnly(readonly);
         try
@@ -357,7 +371,7 @@
             {
                 //Allow the creation this time
                 envConfig.setAllowCreate(true);
-                if (_environment != null )
+                if (_environment != null)
                 {
                     _environment.cleanLog();
                     _environment.close();
@@ -648,7 +662,7 @@
             {
                 _log.info("Restoring binding: (Exchange: " + binding.getExchangeName() + ", Queue: " + binding
                         .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
-                                        + ")");
+                          + ")");
 
                 queue.bind(exchange, binding.getRoutingKey(), binding.getArguments());
             }
@@ -992,7 +1006,7 @@
         {
             _log.debug("Message Id: " + messageId + " Dequeue");
         }
-        
+
         try
         {
 
@@ -1000,7 +1014,7 @@
             if (status == OperationStatus.NOTFOUND)
             {
                 throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
-            } 
+            }
             else if (status != OperationStatus.SUCCESS)
             {
                 throw new AMQException("Unable to remove message with id " + messageId + " on queue " + name);
@@ -1065,7 +1079,7 @@
         {
             try
             {
-                context.setPayload(_environment.beginTransaction(null, _transactionConfig ));
+                context.setPayload(_environment.beginTransaction(null, _transactionConfig));
             }
             catch (DatabaseException e)
             {
@@ -1437,6 +1451,16 @@
         return true;
     }
 
+    public void flow(Long messageId)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void recover(Long messageId)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     Map<AMQShortString, AMQQueue> loadQueues() throws DatabaseException, AMQException
     {
         Cursor cursor = null;
@@ -1598,7 +1622,7 @@
     private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
             throws DatabaseException, AMQException
     {
-        Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
+        Map<Long, PersistentAMQMessage> msgMap = new HashMap<Long, PersistentAMQMessage>();
         List<ProcessAction> actions = new ArrayList<ProcessAction>();
 
         Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1613,7 +1637,6 @@
 
             DatabaseEntry value = new DatabaseEntry();
             EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
-            MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
             long maxId = 1;
 
             TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
@@ -1635,7 +1658,7 @@
 
                 long messageId = dd.messageId;
                 maxId = Math.max(maxId, messageId);
-                AMQMessage message = msgMap.get(messageId);
+                PersistentAMQMessage message = msgMap.get(messageId);
 
                 if (message != null)
                 {
@@ -1643,8 +1666,23 @@
                 }
                 else
                 {
-                    message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
+
+                    message = (PersistentAMQMessage) _messageFactory.createMessage(messageId, this, true);
                     msgMap.put(messageId, message);
+
+                    MessageMetaData mmd = getMessageMetaData(context, messageId);
+                    //Recover HeaderBody
+                    message.recoverFromMessageMetaData(mmd);
+
+                    //Recover Bodies
+                    int count = mmd.getContentChunkCount();
+
+                    for (int index = 0; index < count; index++)
+                    {
+                        ContentChunk cc = getContentBodyChunk(context, messageId, index);
+                        message.recoverContentBodyFrame(cc, index == count);
+                    }
+
                 }
 
                 if (_log.isDebugEnabled())

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java	2009-02-11 13:45:15 UTC (rev 3116)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java	2009-02-13 11:41:24 UTC (rev 3117)
@@ -23,14 +23,10 @@
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 
-import org.apache.mina.common.ByteBuffer;
-
-/**
- * @author Robert Greig (robert.j.greig at jpmorgan.com)
- */
+/** @author Robert Greig (robert.j.greig at jpmorgan.com) */
 public class ContentTB extends TupleBinding
 {
     public Object entryToObject(TupleInput tupleInput)
@@ -39,7 +35,7 @@
         final int size = tupleInput.readInt();
         byte[] underlying = new byte[size];
         tupleInput.readFast(underlying);
-        final ByteBuffer data  = ByteBuffer.wrap(underlying);
+        final ByteBuffer data = ByteBuffer.wrap(underlying);
         ContentChunk cb = new ContentChunk()
         {
 
@@ -70,7 +66,7 @@
         ByteBuffer buf = cb.getData();
 
         buf.duplicate().rewind().get(underlying);
-
+        
         tupleOutput.writeInt(size);
         tupleOutput.writeFast(underlying);
     }

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	2009-02-11 13:45:15 UTC (rev 3116)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	2009-02-13 11:41:24 UTC (rev 3117)
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 
 /**
  * Handles the mapping to and from message meta data
@@ -76,35 +77,7 @@
         final boolean mandatory = tupleInput.readBoolean();
         final boolean immediate = tupleInput.readBoolean();
 
-        return new MessagePublishInfo()
-        {
-
-            public AMQShortString getExchange()
-            {
-                return exchange;
-            }
-
-            public void setExchange(AMQShortString exchange)
-            {
-
-            }
-
-            public boolean isImmediate()
-            {
-                return immediate;
-            }
-
-            public boolean isMandatory()
-            {
-                return mandatory;
-            }
-
-            public AMQShortString getRoutingKey()
-            {
-                return routingKey;
-            }
-        }   ;
-
+        return new MessagePublishInfoImpl(exchange,immediate,mandatory,routingKey);        
     }
 
 

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2009-02-11 13:45:15 UTC (rev 3116)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2009-02-13 11:41:24 UTC (rev 3117)
@@ -19,7 +19,6 @@
 
 import com.sleepycat.je.DatabaseException;
 import junit.framework.Assert;
-import junit.framework.TestCase;
 import junit.framework.TestSuite;
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
@@ -28,6 +27,7 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
@@ -39,6 +39,7 @@
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.MockContentChunk;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
@@ -55,7 +56,7 @@
 
     private BDBMessageStore _store;
     private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
-    
+
     private StoreContext _storeContext = new StoreContext();
     private VirtualHost _virtualHost;
 
@@ -176,36 +177,7 @@
 
     private MessagePublishInfo createPublishBody()
     {
-
-        return new MessagePublishInfo()
-        {
-
-            public AMQShortString getExchange()
-            {
-                return MYEXCHANGE;
-            }
-
-            public void setExchange(AMQShortString exchange)
-            {
-
-            }
-
-            public boolean isImmediate()
-            {
-                return false;
-            }
-
-            public boolean isMandatory()
-            {
-                return true;
-            }
-
-            public AMQShortString getRoutingKey()
-            {
-                return RK;
-            }
-        };
-
+        return new MessagePublishInfoImpl(MYEXCHANGE,false,true,RK);
     }
 
     private BasicContentHeaderProperties createContentHeaderProperties()
@@ -440,8 +412,10 @@
 
         _store.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb, 0));
         _store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 0));
+        _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 1));
 
+        _store.storeContentBodyChunk(_storeContext, 42L, 0, new MockContentChunk(), true);
+
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
 




More information about the rhmessaging-commits mailing list