[jboss-cvs] JBoss Messaging SVN: r3415 - in branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore: impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 5 14:46:16 EST 2007


Author: timfox
Date: 2007-12-05 14:46:16 -0500 (Wed, 05 Dec 2007)
New Revision: 3415

Added:
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java
Log:
Missing files


Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,420 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.util.StreamUtils;
+
+/**
+ * A concrete implementation of a message
+ * 
+ * All messages handled by JBM servers are of this type
+ * 
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2740 $</tt>
+ * 
+ * Note this class is only serializable so messages can be returned from JMX operations
+ * e.g. listAllMessages.
+ * 
+ * For normal message transportation serialization is not used
+ * 
+ * $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
+ */
+public class MessageImpl implements Message
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(MessageImpl.class);
+
+   // Attributes ----------------------------------------------------
+
+   private boolean trace = log.isTraceEnabled();
+
+   private long messageID;
+   
+   private int type;
+   
+   private boolean reliable;
+
+   /** GMT milliseconds at which this message expires. 0 means never expires * */
+   private long expiration;
+
+   private long timestamp;
+
+   private Map<String, Object> headers;
+
+   private byte priority;
+
+   //The payload of MessageImpl instances is opaque
+   private byte[] payload;
+   
+   //We keep track of the persisted references for this message
+   private transient List<MessageReference> references = new ArrayList<MessageReference>();
+   
+   private String destination;
+   
+   private String connectionID;
+         
+   // Constructors --------------------------------------------------
+
+   /*
+    * Construct a message for deserialization or streaming
+    */
+   public MessageImpl()
+   {
+   }
+
+   public MessageImpl(long messageID, int type, boolean reliable, long expiration,
+                      long timestamp, byte priority)
+   {
+      this.messageID = messageID;
+      this.reliable = reliable;
+      this.expiration = expiration;
+      this.timestamp = timestamp;
+      this.priority = priority;
+      
+      this.headers = new HashMap<String, Object>();
+   }
+
+   /*
+    * Construct a MessageImpl from storage
+    */
+   public MessageImpl(long messageID, int type, boolean reliable, long expiration,
+                      long timestamp, byte priority, byte[] headers, byte[] payload)
+      throws Exception
+   {
+      this.messageID = messageID;
+      this.reliable = reliable;
+      this.expiration = expiration;
+      this.timestamp = timestamp;
+      this.priority = priority;
+      
+      if (headers == null)
+      {
+         this.headers = new HashMap<String, Object>();
+      }
+      else
+      {
+         //TODO keep headers opaque on server
+         ByteArrayInputStream bis = new ByteArrayInputStream(headers);
+
+         DataInputStream dais = new DataInputStream(bis);
+
+         this.headers = StreamUtils.readMap(dais, true);
+
+         dais.close();
+      }
+      this.payload = payload;
+   }
+   
+   /**
+    * Copy constructor
+    * 
+    * @param other
+    */
+   public MessageImpl(MessageImpl other)
+   {
+      this.messageID = other.messageID;
+      this.reliable = other.reliable;
+      this.expiration = other.expiration;
+      this.timestamp = other.timestamp;
+      this.priority = other.priority;
+      this.headers = new HashMap<String, Object>(other.headers);
+      this.payload = other.payload;
+   }
+   
+   // Message implementation ----------------------------------------
+
+   public long getMessageID()
+   {
+      return messageID;
+   }
+   
+   public void setMessageID(long id)
+   {
+      this.messageID = id;
+   }
+   
+   public String getDestination()
+   {
+      return destination;
+   }
+   
+   public void setDestination(String destination)
+   {
+      this.destination = destination;
+   }
+   
+   public int getType()
+   {
+      return type;
+   }
+
+   public boolean isReliable()
+   {
+      return reliable;
+   }
+   
+   public void setReliable(boolean reliable)
+   {
+      this.reliable = reliable;
+   }
+
+   public long getExpiration()
+   {
+      return expiration;
+   }
+
+   public void setExpiration(long expiration)
+   {
+      this.expiration = expiration;
+   }
+
+   public long getTimestamp()
+   {
+      return timestamp;
+   }
+   
+   public void setTimestamp(long timestamp)
+   {
+      this.timestamp = timestamp;
+   }
+
+   public Object putHeader(String name, Object value)
+   {
+      return headers.put(name, value);
+   }
+
+   public Object getHeader(String name)
+   {
+      return headers.get(name);
+   }
+
+   public Object removeHeader(String name)
+   {
+      return headers.remove(name);
+   }
+
+   public boolean containsHeader(String name)
+   {
+      return headers.containsKey(name);
+   }
+
+   public Map<String, Object> getHeaders()
+   {
+      return headers;
+   }
+
+   public byte getPriority()
+   {
+      return priority;
+   }
+
+   public void setPriority(byte priority)
+   {
+      this.priority = priority;
+   }
+
+   // TODO - combine with getPayloadAsByteArray to get one big blob
+   public byte[] getHeadersAsByteArray() throws Exception
+   {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+
+      DataOutputStream oos = new DataOutputStream(bos);
+
+      StreamUtils.writeMap(oos, headers, true);
+
+      oos.close();
+
+      return bos.toByteArray();
+   }
+
+   public byte[] getPayload()
+   {     
+      return payload;
+   }
+   
+   public void setPayload(byte[] payload)
+   {
+      this.payload = payload;
+   }
+   
+   public String getConnectionID()
+   {
+      return connectionID;
+   }
+   
+   public void setConnectionID(String connectionID)
+   {
+      this.connectionID = connectionID;
+   }
+
+   public boolean isExpired()
+   {
+      if (expiration == 0)
+      {
+         return false;
+      }
+      
+      long overtime = System.currentTimeMillis() - expiration;
+      
+      if (overtime >= 0)
+      {
+         return true;
+      }
+      return false;
+   }
+   
+   public MessageReference createReference(Queue queue)
+   {
+      MessageReference ref =  new MessageReferenceImpl(this, queue);
+      
+      references.add(ref);
+      
+      return ref;
+   }
+   
+   public List<MessageReference> getReferences()
+   {
+      return references;
+   }
+   
+   public Message copy()
+   {
+      return new MessageImpl(this);
+   }
+   
+   // Public --------------------------------------------------------
+
+   public boolean equals(Object o)
+   {
+      if (this == o)
+      {
+         return true;
+      }
+      
+      if (!(o instanceof MessageImpl))
+      {
+         return false;
+      }
+      
+      MessageImpl that = (MessageImpl) o;
+      
+      return that.messageID == this.messageID;
+   }
+
+   public int hashCode()
+   {
+      return (int) ((this.messageID >>> 32) ^ this.messageID);
+   }
+
+   public String toString()
+   {
+      return "M[" + messageID + "]";
+   }
+
+   // Streamable implementation ---------------------------------
+
+   public void write(DataOutputStream out) throws Exception
+   {
+      out.writeLong(messageID);
+      
+      out.writeUTF(destination);
+      
+      out.writeInt(type);
+
+      out.writeBoolean(reliable);
+
+      out.writeLong(expiration);
+
+      out.writeLong(timestamp);
+
+      StreamUtils.writeMap(out, headers, true);
+
+      out.writeByte(priority);
+
+      if (payload != null)
+      {
+         out.writeInt(payload.length);
+
+         out.write(payload);
+      }
+      else
+      {
+         out.writeInt(0);
+      }
+   }
+
+   public void read(DataInputStream in) throws Exception
+   {
+      messageID = in.readLong();
+      
+      destination = in.readUTF();
+      
+      type = in.readInt();
+
+      reliable = in.readBoolean();
+
+      expiration = in.readLong();
+
+      timestamp = in.readLong();
+
+      headers = StreamUtils.readMap(in, true);
+
+      priority = in.readByte();
+
+      int length = in.readInt();
+
+      if (length == 0)
+      {
+         // no payload
+         payload = null;
+      }
+      else
+      {
+         payload = new byte[length];
+
+         in.readFully(payload);
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,132 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.newcore.impl;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+
+/**
+ * Implementation of a MessageReference
+ *
+ * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @version <tt>1.3</tt>
+ *
+ * MessageReferenceImpl.java,v 1.3 2006/02/23 17:45:57 timfox Exp
+ */
+public class MessageReferenceImpl implements MessageReference
+{   
+   private static final Logger log = Logger.getLogger(MessageReferenceImpl.class);
+   
+   // Attributes ----------------------------------------------------
+
+   private boolean trace = log.isTraceEnabled();
+   
+   private int deliveryCount;   
+   
+   private long scheduledDeliveryTime;
+   
+   private Message message;
+   
+   private Queue queue;
+   
+   // Constructors --------------------------------------------------
+
+   /**
+    * Required by externalization.
+    */
+   public MessageReferenceImpl()
+   {
+      if (trace) { log.trace("Creating using default constructor"); }
+   }
+
+   public MessageReferenceImpl(MessageReferenceImpl other, Queue queue)
+   {
+      this.deliveryCount = other.deliveryCount;
+      
+      this.scheduledDeliveryTime = other.scheduledDeliveryTime;       
+      
+      this.message = other.message;
+      
+      this.queue = queue;
+   }
+   
+   protected MessageReferenceImpl(Message message, Queue queue)
+   {
+   	this.message = message;
+   	
+   	this.queue = queue;
+   }   
+   
+   // MessageReference implementation -------------------------------
+   
+   public MessageReference copy(Queue queue)
+   {
+   	return new MessageReferenceImpl(this, queue);
+   }
+   
+   public int getDeliveryCount()
+   {
+      return deliveryCount;
+   }
+   
+   public void setDeliveryCount(int deliveryCount)
+   {
+      this.deliveryCount = deliveryCount;
+   }
+   
+   public long getScheduledDeliveryTime()
+   {
+      return scheduledDeliveryTime;
+   }
+
+   public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+   {
+      this.scheduledDeliveryTime = scheduledDeliveryTime;
+   }
+      
+   public Message getMessage()
+   {
+      return message;
+   }         
+   
+   public Queue getQueue()
+   {
+      return queue;
+   }
+   
+   // Public --------------------------------------------------------
+
+   public String toString()
+   {
+      return "Reference[" + getMessage().getMessageID() + "]:" + (getMessage().isReliable() ? "RELIABLE" : "NON-RELIABLE");
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------   
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
\ No newline at end of file

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,508 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+
+import org.jboss.jms.server.MessagingTimeoutFactory;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Consumer;
+import org.jboss.messaging.newcore.intf.DistributionPolicy;
+import org.jboss.messaging.newcore.intf.Filter;
+import org.jboss.messaging.newcore.intf.HandleStatus;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
+import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedListImpl;
+import org.jboss.util.timeout.Timeout;
+import org.jboss.util.timeout.TimeoutTarget;
+
+/**
+ * 
+ * A standard non clustered Queue implementation
+ * 
+ * TODO use Java 5 concurrent queue
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class QueueImpl implements Queue
+{
+   private static final Logger log = Logger.getLogger(QueueImpl.class);
+
+   private static final boolean trace = log.isTraceEnabled();
+   
+   protected long id;
+   
+   protected int maxSize = -1;
+   
+   protected Filter filter;
+   
+   protected PriorityLinkedList<MessageReference> messageReferences;
+   
+   protected List<Consumer> consumers;
+   
+   protected Set<Timeout> scheduledTimeouts;
+   
+   protected DistributionPolicy distributionPolicy;
+   
+   protected boolean direct;
+   
+   protected boolean promptDelivery;
+   
+   private int pos;
+   
+   public QueueImpl(long id)
+   {
+      this(id, null);
+   }
+      
+   public QueueImpl(long id, Filter filter)
+   {
+      this.id = id;
+      
+      this.filter = filter;
+      
+      //TODO - use a wait free concurrent queue
+      messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
+      
+      consumers = new ArrayList<Consumer>();
+      
+      scheduledTimeouts = new HashSet<Timeout>();
+      
+      distributionPolicy = new RoundRobinDistributionPolicy();
+      
+      direct = true;
+   }
+   
+   public QueueImpl(long id, Filter filter, int maxSize)
+   {
+      this(id, filter);
+      
+      this.maxSize = maxSize;
+   }
+   
+   // Queue implementation -------------------------------------------------------------------
+      
+   public synchronized HandleStatus addLast(MessageReference ref)
+   {
+      return add(ref, false);
+   }
+
+   public synchronized HandleStatus addFirst(MessageReference ref)
+   {
+      return add(ref, true);
+   }
+   
+           
+   /*
+    * Attempt to deliver all the messages in the queue
+    * @see org.jboss.messaging.newcore.intf.Queue#deliver()
+    */
+   public synchronized void deliver()
+   {
+      MessageReference reference;
+      
+      ListIterator<MessageReference> iterator = null;
+      
+      while (true)
+      {
+         if (iterator == null)
+         {
+            reference = messageReferences.peekFirst();
+         }
+         else
+         {
+            if (iterator.hasNext())
+            {
+               reference = iterator.next();
+            }
+            else
+            {
+               reference = null;
+            }
+         }
+         
+         if (reference == null)
+         {
+            if (iterator == null)
+            {
+               //We delivered all the messages - go into direct delivery
+               direct = true;
+            }
+            return;
+         }
+         
+         HandleStatus status = deliver(reference);
+         
+         if (status == HandleStatus.HANDLED)
+         {
+            if (iterator == null)
+            {
+               messageReferences.removeFirst();
+            }
+            else
+            {
+               iterator.remove();
+            }
+         }
+         else if (status == HandleStatus.BUSY)
+         {
+            //All consumers busy - give up
+            break;
+         }
+         else if (status == HandleStatus.NO_MATCH && iterator == null)
+         {
+            //Consumers not all busy - but filter not accepting - iterate back through the queue
+            iterator = messageReferences.iterator();
+         }
+      }               
+   }
+   
+   public synchronized void addConsumer(Consumer consumer)
+   {
+      consumers.add(consumer);
+   }
+   
+   public synchronized boolean removeConsumer(Consumer consumer)
+   {
+      boolean removed = consumers.remove(consumer);
+      
+      if (pos == consumers.size())
+      {
+         pos = 0;
+      }
+      
+      if (consumers.isEmpty())
+      {
+         promptDelivery = false;
+      }
+      
+      return removed;
+   }
+   
+   public synchronized boolean deliverScheduled(MessageReference reference)
+   {
+      //TODO
+      
+      return false;
+   }
+   
+   public synchronized int getConsumerCount()
+   {
+      return consumers.size();
+   }
+
+   public synchronized List<MessageReference> list(Filter filter)
+   {
+      if (filter == null)
+      {
+         return new ArrayList<MessageReference>(messageReferences.getAll());
+      }
+      else
+      {
+         ArrayList<MessageReference> list = new ArrayList<MessageReference>();
+         
+         for (MessageReference ref: messageReferences.getAll())
+         {
+            if (filter.match(ref.getMessage()))
+            {
+               list.add(ref);
+            }
+         }
+         
+         return list;
+      }
+   }
+
+   public synchronized void removeAllReferences()
+   {
+      messageReferences.clear();
+      
+      if (!this.scheduledTimeouts.isEmpty())
+      {
+         Set<Timeout> clone = new HashSet<Timeout>(scheduledTimeouts);
+         
+         for (Timeout timeout: clone)
+         {
+            timeout.cancel();
+         }
+         
+         scheduledTimeouts.clear();
+      }
+   }
+
+   public long getID()
+   {
+      return id;
+   }
+   
+   public synchronized Filter getFilter()
+   {
+      return filter;
+   }
+   
+   public synchronized void setFilter(Filter filter)
+   {
+      this.filter = filter;
+   }
+
+   public synchronized int getMessageCount()
+   {
+      return messageReferences.size();
+   }
+   
+   public synchronized int getScheduledCount()
+   {
+      return scheduledTimeouts.size();
+   }
+
+   public synchronized int getMaxSize()
+   {
+      return maxSize;
+   }
+
+   public synchronized void setMaxSize(int maxSize)
+   {
+      int num = messageReferences.size() + scheduledTimeouts.size();
+      
+      if (maxSize < num)
+      {
+         throw new IllegalArgumentException("Cannot set maxSize to " + maxSize + " since there are " + num + " refs");
+      }
+      this.maxSize = maxSize;
+   }
+     
+   public synchronized DistributionPolicy getDistributionPolicy()
+   {
+      return distributionPolicy;
+   }
+
+   public synchronized void setDistributionPolicy(DistributionPolicy distributionPolicy)
+   {
+      this.distributionPolicy = distributionPolicy;
+   }
+              
+   // Private ------------------------------------------------------------------------------
+   
+   private HandleStatus add(MessageReference ref, boolean first)
+   {
+      if (!checkFull())
+      {
+         return HandleStatus.BUSY;
+      }
+      
+      if (filter != null)
+      {
+         if (!filter.match(ref.getMessage()))
+         {
+            return HandleStatus.NO_MATCH;
+         }
+      }
+      
+      if (!checkAndSchedule(ref))
+      {           
+         boolean add = false;
+         
+         if (direct)
+         {
+            //Deliver directly
+            
+            HandleStatus status = deliver(ref);
+            
+            if (status == HandleStatus.HANDLED)
+            {
+               //Ok
+            }
+            else if (status == HandleStatus.BUSY)
+            {
+               add = true;
+            }
+            else if (status == HandleStatus.NO_MATCH)
+            {
+               add = true;
+               
+               promptDelivery = true;
+            }
+            
+            if (add)
+            {
+               direct = false;
+            }
+         }
+         else
+         {
+            add = true;
+         }
+         
+         if (add)
+         {
+            if (first)
+            {
+               messageReferences.addFirst(ref, ref.getMessage().getPriority());
+            }
+            else
+            {
+               messageReferences.addLast(ref, ref.getMessage().getPriority());
+            }
+            
+            if (!direct && promptDelivery)
+            {
+               //We have consumers with filters which don't match, so we need to prompt delivery every time
+               //a new message arrives - this is why you really shouldn't use filters with queues - in most cases
+               //it's an ant-pattern since it would cause a queue scan on each message
+               deliver();
+            }
+         }
+      }
+      
+      return HandleStatus.HANDLED;
+   }
+             
+   private boolean checkAndSchedule(MessageReference ref)
+   {
+      if (ref.getScheduledDeliveryTime() > System.currentTimeMillis())
+      {      
+         if (trace) { log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime()); }
+         
+         // Schedule the cancel to actually occur at the specified time. 
+            
+         Timeout timeout =
+            MessagingTimeoutFactory.instance.getFactory().
+               schedule(ref.getScheduledDeliveryTime(), new DeliverRefTimeoutTarget(ref));
+         
+         scheduledTimeouts.add(timeout);
+                       
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+   
+   private boolean checkFull()
+   {
+      if (maxSize != -1 && (messageReferences.size() + scheduledTimeouts.size()) >= maxSize)
+      {
+         if (trace) { log.trace(this + " queue is full, rejecting message"); }
+         
+         return false;
+      }
+      else
+      {
+         return true;
+      }
+   }
+   
+   private HandleStatus deliver(MessageReference reference)
+   {
+      if (consumers.isEmpty())
+      {
+         return HandleStatus.BUSY;
+      }
+      
+      int startPos = pos;
+      
+      boolean filterRejected = false;
+      
+      while (true)
+      {               
+         Consumer consumer = consumers.get(pos);
+         
+         pos = distributionPolicy.select(consumers, pos);                  
+                  
+         HandleStatus status = consumer.handle(reference);
+         
+         if (status == HandleStatus.HANDLED)
+         {
+            return HandleStatus.HANDLED;
+         }
+         else if (status == HandleStatus.NO_MATCH)
+         {
+            filterRejected = true;
+         }       
+         
+         if (pos == startPos)
+         {
+            //Tried all of them
+            if (filterRejected)
+            {
+               return HandleStatus.NO_MATCH;
+            }
+            else
+            {
+               //Give up - all consumers busy
+               return HandleStatus.BUSY;
+            }
+         }
+      }     
+   }
+   
+   // Inner classes --------------------------------------------------------------------------
+   
+   private class DeliverRefTimeoutTarget implements TimeoutTarget
+   {
+      private MessageReference ref;
+
+      public DeliverRefTimeoutTarget(MessageReference ref)
+      {
+         this.ref = ref;
+      }
+
+      public void timedOut(Timeout timeout)
+      {
+         if (trace) { log.trace("Scheduled delivery timeout " + ref); }
+         
+         synchronized (scheduledTimeouts)
+         {
+            boolean removed = scheduledTimeouts.remove(timeout);
+            
+            if (!removed)
+            {
+               throw new IllegalStateException("Failed to remove timeout " + timeout);
+            }
+         }
+              
+         ref.setScheduledDeliveryTime(0);
+         
+         HandleStatus status = deliver(ref);
+                  
+         if (HandleStatus.HANDLED != status)
+         {
+            //Add back to the front of the queue
+            
+            addFirst(ref);
+         }
+         else
+         {
+            if (trace) { log.trace("Delivered scheduled delivery at " + System.currentTimeMillis() + " for " + ref); }
+         }
+      }
+   }
+   
+
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.util.List;
+
+import org.jboss.messaging.newcore.intf.Consumer;
+import org.jboss.messaging.newcore.intf.DistributionPolicy;
+
+/**
+ * 
+ * A RoundRobinDistributionPolicy
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RoundRobinDistributionPolicy implements DistributionPolicy
+{   
+   public int select(List<Consumer> consumers, int pos)
+   {
+      if (pos == -1)
+      {
+         //First time
+         pos = 0;
+      }
+      else
+      {
+         pos++;
+         
+         if (pos == consumers.size())
+         {
+            pos = 0;
+         }
+      }
+      return pos;
+   }
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,214 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.PersistenceManager;
+import org.jboss.messaging.newcore.intf.Transaction;
+import org.jboss.messaging.newcore.intf.TransactionSynchronization;
+
+/**
+ * 
+ * A TransactionImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TransactionImpl implements Transaction
+{
+   private static final Logger log = Logger.getLogger(TransactionImpl.class);
+   
+   private static final boolean trace = log.isTraceEnabled();
+      
+   private List<MessageReference> refsToAdd;
+   
+   private List<MessageReference> refsToRemove;
+   
+   private List<TransactionSynchronization> synchronizations = new ArrayList<TransactionSynchronization>();
+   
+   private Xid xid;
+   
+   private boolean containsPersistent;
+   
+   public TransactionImpl(List<MessageReference> refsToAdd, List<MessageReference> refsToRemove,
+                          boolean containsPersistent)
+   {
+      this.refsToAdd = refsToAdd;
+      
+      this.refsToRemove = refsToRemove;
+      
+      this.containsPersistent = containsPersistent;
+   }
+   
+   // Transaction implementation -----------------------------------------------------------
+   
+   public void addSynchronization(TransactionSynchronization sync)
+   {
+      synchronizations.add(sync);
+   }
+   
+   public void prepare(PersistenceManager persistenceManager)
+   {
+      if (xid == null)
+      {
+         throw new IllegalStateException("Cannot call prepare() on a non XA transaction");
+      }
+      else
+      {
+         
+      }
+   }
+   
+   public void commit(PersistenceManager persistenceManager) throws Exception
+   {
+      callSynchronizations(SyncType.BEFORE_COMMIT);
+            
+      if (containsPersistent)
+      {
+         if (xid == null)
+         {
+            //1PC commit
+            PersistenceTransaction tx = null;
+            
+            try
+            {         
+               tx = persistenceManager.createTransaction(false);
+               
+               playOperations(tx, persistenceManager);
+               
+               tx.commit();
+                                                                          
+            }
+            catch (Exception e)
+            {
+               try
+               {
+                  tx.rollback();
+               }
+               catch (Throwable t)
+               {
+                  if (trace) { log.trace("Failed to rollback", t); }
+               }
+               throw e;
+            }
+         }
+         else
+         {
+            //2PC commit
+            
+            PersistenceTransaction tx = null;
+            
+            tx = persistenceManager.getTransaction(xid);
+            
+            playOperations(tx, persistenceManager);
+            
+            tx.commit();
+            
+            
+         } 
+      }
+      
+      
+      //Now add to queue(s)
+      
+      for (MessageReference reference: refsToAdd)
+      {
+         reference.getQueue().addLast(reference);
+      }
+      
+      callSynchronizations(SyncType.AFTER_COMMIT);
+   }
+   
+   public void rollback(PersistenceManager persistenceManager) throws Exception
+   {
+      callSynchronizations(SyncType.BEFORE_ROLLBACK);
+      
+      if (xid == null)
+      {
+         //1PC rollback - nothing to do
+      }
+      else
+      {
+         
+      }
+      callSynchronizations(SyncType.AFTER_ROLLBACK);
+      
+   }
+   
+   // Private -------------------------------------------------------------------
+   
+   private void playOperations(PersistenceTransaction tx, PersistenceManager persistenceManager) throws Exception
+   {
+      for (MessageReference reference: refsToAdd)
+      {
+         if (reference.getMessage().isReliable())
+         {
+            persistenceManager.addReference(tx, reference.getQueue(), reference);
+         }
+      }
+      
+      for (MessageReference reference: refsToRemove)
+      {
+         if (reference.getMessage().isReliable())
+         {
+            persistenceManager.removeReference(tx, reference.getQueue(), reference);
+         }
+      }
+   }
+   
+   private void callSynchronizations(SyncType type) throws Exception
+   {
+      for (TransactionSynchronization sync: synchronizations)
+      {
+         if (type == SyncType.BEFORE_COMMIT)
+         {
+            sync.beforeCommit();
+         }
+         else if (type == SyncType.AFTER_COMMIT)
+         {
+            sync.afterCommit();
+         }
+         else if (type == SyncType.BEFORE_ROLLBACK)
+         {
+            sync.beforeRollback();
+         }
+         else if (type == SyncType.AFTER_ROLLBACK);
+         {
+            sync.afterRollback();
+         }            
+      }
+   }
+   
+   // Inner Enums -------------------------------------------------------------------------------
+   
+   private enum SyncType
+   {
+      BEFORE_COMMIT, AFTER_COMMIT, BEFORE_ROLLBACK, AFTER_ROLLBACK;
+   }
+         
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,38 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.newcore.impl.bdbje;
+
+import org.jboss.messaging.util.Pair;
+
+/**
+ * 
+ * A CursorIntf
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJECursor
+{
+   Pair<Long, byte[]> getNext() throws Exception;
+   
+   void close() throws Exception;
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,26 @@
+package org.jboss.messaging.newcore.impl.bdbje;
+
+
+/**
+ * 
+ * A BDBJEDatabase
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJEDatabase
+{
+   void put(BDBJETransaction tx, long id, byte[] bytes, int offset, int length) throws Exception;
+   
+   void remove(BDBJETransaction tx, long id) throws Exception;
+   
+   void close() throws Exception;
+   
+   BDBJECursor cursor() throws Exception;   
+   
+   //Only used for testing
+   
+   long size() throws Exception;
+   
+   byte[] get(long id) throws Exception;
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,55 @@
+package org.jboss.messaging.newcore.impl.bdbje;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * 
+ * A BDBJEEnvironment
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJEEnvironment
+{
+   void start() throws Exception;
+   
+   void stop() throws Exception;
+   
+   BDBJETransaction createTransaction() throws Exception;
+   
+   BDBJEDatabase getDatabase(String databaseName) throws Exception;
+   
+   void setEnvironmentPath(String environmentPath);
+   
+   String getEnvironmentPath();
+      
+   void setTransacted(boolean transacted);
+   
+   boolean isTransacted();
+   
+   void setSyncVM(boolean sync);
+   
+   boolean isSyncVM();
+   
+   void setSyncOS(boolean sync);
+   
+   boolean isSyncOS();
+   
+   void setMemoryCacheSize(long size);
+   
+   long getMemoryCacheSize();
+   
+   void startWork(Xid xid) throws Exception;
+   
+   void endWork(Xid xid, boolean fail) throws Exception;
+   
+   void prepare(Xid xid) throws Exception;
+   
+   void commit(Xid xid) throws Exception;
+   
+   void rollback(Xid xid) throws Exception;
+   
+   List<Xid> getInDoubtXids() throws Exception;
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,545 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.newcore.impl.bdbje;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.impl.MessageImpl;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.PersistenceManager;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.util.Pair;
+
+import com.sleepycat.je.DatabaseEntry;
+
+/**
+ * 
+ * A PersistenceManager implementation that stores messages using Berkeley DB Java Edition.
+ * 
+ * We store message data in one BDB JE Database and MessageReference data in another.
+ * 
+ * Both Database instances are in the same BDB JE Environment. All database in a single environment
+ * share the same log file and can participate in the same transaction.
+ * 
+ * We store MessageReference data in a different Database since currently BDB JE is not optimised for partial
+ * updates - this means if we have a large message, then for every MessageReference we would have to update
+ * and store the entire message again - once for each reference as they are delivered / acknowldeged - this
+ * can give very poor performance.
+ * 
+ * TODO - Optimisation - If there is only one MessageReference per Message then we can store it in the same database
+ * 
+ * For XA functionality we do not write explicit transaction records as it proves somewhat quicker to rely
+ * on BDB JE's inbuilt XA transaction capability.
+ * 
+ * If messages are larger than <minLargeMessageSize> the messages are not stored in the BDB log but are stored
+ * as individual files in the <largeMessageRepositoryPath> directory referenced by a pointer from
+ * the log. This is because storing very large messages in a BDB log is not an efficient use of the log.
+ * 
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class BDBJEPersistenceManager implements PersistenceManager
+{
+   private static final Logger log = Logger.getLogger(BDBJEPersistenceManager.class);
+   
+   private static final boolean trace = log.isTraceEnabled();
+     
+   private static final int SIZE_LONG = 8;
+   
+   private static final int SIZE_INT = 4;
+   
+   private static final int SIZE_BYTE = 1;
+   
+   public static final int SIZE_REF_DATA =
+      SIZE_LONG + SIZE_INT + SIZE_LONG; // queue id + delivery count + scheduled delivery
+   
+   // type + expiration + timestamp + priority
+   public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE; 
+      
+   private static final byte[] ZERO_LENGTH_BYTE_ARRAY = new byte[0];
+   
+   public static final String MESSAGE_DB_NAME = "message";
+   
+   public static final String REFERENCE_DB_NAME = "reference";
+   
+   private boolean recovery;
+    
+   private BDBJEEnvironment environment;
+   
+   private BDBJEDatabase messageDB;
+   
+   private BDBJEDatabase refDB;
+   
+   private String largeMessageRepositoryPath;
+   
+   private int minLargeMessageSize;
+   
+   private String environmentPath;
+      
+   private boolean started;
+      
+   public BDBJEPersistenceManager(BDBJEEnvironment environment, String environmentPath)
+   {
+      this.environment = environment;
+      
+      this.environmentPath = environmentPath;
+   }
+   
+   // MessagingComponent implementation -----------------------------------------------------
+   
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      environment.setEnvironmentPath(environmentPath);
+      
+      environment.start();    
+      
+      messageDB = environment.getDatabase(MESSAGE_DB_NAME);
+      
+      refDB = environment.getDatabase(REFERENCE_DB_NAME);
+      
+      started = true;
+   }
+   
+   public synchronized void stop() throws Exception
+   {
+      if (!started)
+      {
+         return;         
+      }
+      
+      messageDB.close();
+      
+      refDB.close();
+      
+      environment.stop();
+      
+      started = false;
+   }
+   
+   // PersistenceManager implementation ----------------------------------------------------------
+
+   public void commitMessage(Message message) throws Exception
+   {
+      BDBJETransaction tx = null;
+      
+      try
+      {      
+         tx = environment.createTransaction();
+         
+         internalCommitMessage(tx, message);
+         
+         tx.commit();
+      }
+      catch (Exception e)
+      {
+         try
+         {
+            if (tx != null)
+            {
+               tx.rollback();
+            }
+         }
+         catch (Throwable ignore)
+         {
+            if (trace) { log.trace("Failed to rollback", ignore); }
+         }
+      }            
+   }
+   
+   public void commitMessages(List<Message> messages)
+   {
+      BDBJETransaction tx = null;
+      
+      try
+      {      
+         tx = environment.createTransaction();  
+         
+         for (Message message: messages)
+         {
+            internalCommitMessage(tx, message);
+         }
+         
+         tx.commit();
+      }
+      catch (Exception e)
+      {
+         try
+         {
+            if (tx != null)
+            {
+               tx.rollback();
+            }
+         }
+         catch (Throwable ignore)
+         {
+            if (trace) { log.trace("Failed to rollback", ignore); }
+         }
+      }      
+   }
+   
+   public void deleteReferences(List<MessageReference> references)
+   {
+      BDBJETransaction tx = null;
+      
+      try
+      {      
+         tx = environment.createTransaction();
+         
+         for (MessageReference ref: references)
+         {
+            internalDeleteReference(tx, ref);
+         }
+         
+         tx.commit();
+      }
+      catch (Exception e)
+      {
+         try
+         {
+            if (tx != null)
+            {
+               tx.rollback();
+            }
+         }
+         catch (Throwable ignore)
+         {
+            if (trace) { log.trace("Failed to rollback", ignore); }
+         }
+      }
+   }
+
+   public void deleteReference(MessageReference reference)
+   {
+      BDBJETransaction tx = null;
+      
+      try
+      {      
+         tx = environment.createTransaction();
+         
+         internalDeleteReference(tx, reference);
+         
+         tx.commit();
+      }
+      catch (Exception e)
+      {
+         try
+         {
+            if (tx != null)
+            {
+               tx.rollback();
+            }
+         }
+         catch (Throwable ignore)
+         {
+            if (trace) { log.trace("Failed to rollback", ignore); }
+         }
+      }
+   }
+      
+   public List<Xid> getInDoubtXids() throws Exception
+   {
+      return environment.getInDoubtXids();
+   }
+
+   public void setInRecoveryMode(boolean recoveryMode)
+   {
+      this.recovery = recoveryMode;
+   }
+   
+   public boolean isInRecoveryMode()
+   {
+      return recovery;
+   }
+   
+   public void loadQueues(Map<Long, Queue> queues) throws Exception
+   {
+      BDBJECursor cursorMessage = this.messageDB.cursor();
+      
+      BDBJECursor cursorRef = this.refDB.cursor();
+      
+      Pair<Long, byte[]> messagePair;
+      
+      Pair<Long, byte[]> refPair;
+      
+      while ((messagePair = cursorMessage.getNext()) != null)
+      {
+         refPair = cursorRef.getNext();
+         
+         if (refPair == null)
+         {
+            throw new IllegalStateException("Message and ref data out of sync");
+         }
+                 
+         long id = messagePair.a;
+         
+         byte[] bytes = messagePair.b;
+               
+         ByteBuffer buffer = ByteBuffer.wrap(bytes);
+         
+         int type = buffer.getInt();
+         
+         long expiration = buffer.getLong();
+         
+         long timestamp = buffer.getLong();
+         
+         byte priority = buffer.get();
+         
+         int headerSize = buffer.getInt();
+         
+         //TODO we can optimise this to prevent a copy - let the message use a window on the byte[]
+         
+         byte[] headers = new byte[headerSize];
+         
+         buffer.get(headers);
+         
+         int payloadSize = buffer.getInt();
+         
+         byte[] payload = new byte[payloadSize];
+         
+         buffer.get(payload);
+         
+         Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
+                                           headers, payload);
+         
+         //Now the ref data
+         
+         byte[] refBytes = refPair.b;
+         
+         buffer = ByteBuffer.wrap(refBytes);
+         
+         while (buffer.hasRemaining())
+         {
+            long queueID = buffer.getLong();
+            
+            int deliveryCount = buffer.getInt();
+            
+            long scheduledDeliveryTime = buffer.getLong();
+            
+            Queue queue = queues.get(queueID);
+            
+            if (queue == null)
+            {
+               //Ok - queue is not deployed
+            }
+            else
+            {
+               MessageReference reference = message.createReference(queue);
+               
+               reference.setDeliveryCount(deliveryCount);
+               
+               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+               
+               queue.addLast(reference);
+            }    
+         }         
+      }     
+   }
+
+   public void prepareMessages(Xid xid, List<Message> messages) throws Exception
+   {      
+      environment.startWork(xid);
+      
+      try
+      {         
+         for (Message message: messages)
+         {
+            internalCommitMessage(null, message);
+         }
+      }
+      catch (Exception e)
+      {
+         try
+         {
+            environment.endWork(xid, true);
+         }
+         catch (Throwable ignore)
+         {
+            if (trace) { log.trace("Failed to end", ignore); }
+         }
+         
+         throw e;
+      }
+      
+      environment.endWork(xid, false);
+      
+      environment.prepare(xid);
+   }
+   
+   public void commitPreparedMessages(Xid xid) throws Exception
+   {
+      environment.commit(xid);
+   }
+
+   public void unprepareMessages(Xid xid, List<Message> messages) throws Exception
+   {
+      environment.rollback(xid);      
+   }
+   
+   public void updateDeliveryCount(Queue queue, MessageReference ref) throws Exception
+   {
+      //TODO - optimise this scan
+      
+      int pos = ref.getMessage().getReferences().indexOf(ref);
+      
+      int offset = pos * SIZE_REF_DATA + SIZE_LONG;
+      
+      byte[] bytes = new byte[SIZE_INT];
+      
+      ByteBuffer buff = ByteBuffer.wrap(bytes);
+      
+      buff.putInt(ref.getDeliveryCount());
+      
+      refDB.put(null, ref.getMessage().getMessageID(), bytes, offset, SIZE_INT);
+   }
+   
+   // Public ----------------------------------------------------------------------------------
+   
+   public String getLargeMessageRepositoryPath()
+   {
+      return largeMessageRepositoryPath;
+   }
+
+   public void setLargeMessageRepository(String largeMessageRepositoryPath)
+   {
+      this.largeMessageRepositoryPath = largeMessageRepositoryPath;
+   }
+   
+   public int getMinLargeMessageSize()
+   {
+      return minLargeMessageSize;
+   }
+   
+   public void setMinLargeMessageSize(int minLargeMessageSize)
+   {
+      this.minLargeMessageSize = minLargeMessageSize; 
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public void setStarted(boolean started)
+   {
+      this.started = started;
+   }
+   
+   // Private ---------------------------------------------------------------------------------
+   
+   
+   private void internalCommitMessage(BDBJETransaction tx, Message message) throws Exception
+   {
+      //First store the message
+      
+      byte[] headers = message.getHeadersAsByteArray();
+      
+      int headersLength = headers.length;
+      
+      byte[] payload = message.getPayload();
+      
+      int payloadLength = payload == null ? 0 : payload.length;
+      
+      //TODO - avoid copying by having message already store it's byte representation or do partial
+      //update in BDB
+      byte[] bytes = new byte[SIZE_FIELDS + 2 * SIZE_INT + headersLength + payloadLength];
+               
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      
+      //Put the fields
+      buffer.putInt(message.getType());
+      buffer.putLong(message.getExpiration());
+      buffer.putLong(message.getTimestamp());
+      buffer.put(message.getPriority());  
+      
+      buffer.putInt(headersLength);
+      buffer.put(headers);    
+      
+      buffer.putInt(payloadLength);
+      if (payload != null)
+      {
+         buffer.put(payload);
+      }
+       
+      //Now the ref(s)
+      
+      byte[] refBytes = new byte[message.getReferences().size() * SIZE_REF_DATA];
+      
+      ByteBuffer buff = ByteBuffer.wrap(refBytes);
+      
+      for (MessageReference ref: message.getReferences())
+      {
+         buff.putLong(ref.getQueue().getID());
+         buff.putInt(ref.getDeliveryCount());
+         buff.putLong(ref.getScheduledDeliveryTime());
+      }
+      
+      try
+      {         
+         messageDB.put(tx, message.getMessageID(), bytes, 0, bytes.length);
+      }
+      catch (Throwable t)
+      {
+         t.printStackTrace();
+      }
+      
+      refDB.put(tx, message.getMessageID(), refBytes, 0, refBytes.length);
+   }
+   
+   private void internalDeleteReference(BDBJETransaction tx, MessageReference reference) throws Exception
+   {
+      Message message = reference.getMessage();
+      
+      boolean deleteAll = message.getReferences().size() == 1;
+       
+      if (deleteAll)
+      {
+         refDB.remove(tx, message.getMessageID());
+         
+         messageDB.remove(tx, message.getMessageID());
+         
+         message.getReferences().remove(0);
+      }
+      else
+      {            
+         //TODO - this can be optimised so not to scan using indexOf
+         
+         int pos = message.getReferences().indexOf(reference);
+         
+         int offset = pos * SIZE_REF_DATA;
+         
+         refDB.put(tx, message.getMessageID(), ZERO_LENGTH_BYTE_ARRAY, offset, SIZE_REF_DATA);
+         
+         message.getReferences().remove(pos);
+      }                         
+   }
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,15 @@
+package org.jboss.messaging.newcore.impl.bdbje;
+
+/**
+ * 
+ * A BDBJETransaction
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJETransaction
+{
+   public void commit() throws Exception;
+   
+   public void rollback() throws Exception;     
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,161 @@
+package org.jboss.messaging.newcore.impl.bdbje.integration;
+
+import org.jboss.messaging.newcore.impl.bdbje.BDBJECursor;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJETransaction;
+import org.jboss.messaging.util.Pair;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
+/**
+ * 
+ * A RealBDBJEDatabase
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealBDBJEDatabase implements BDBJEDatabase
+{  
+   private Database database;
+   
+   RealBDBJEDatabase(Database database)
+   {
+      this.database = database;
+   }
+
+   // BDBJEDatabase implementation ------------------------------------------
+   
+   public void put(BDBJETransaction tx, long id, byte[] bytes, int offset, int length) throws Exception
+   {
+      DatabaseEntry key = createKey(id);
+
+      DatabaseEntry value = new DatabaseEntry();
+
+      if (offset != 0)
+      {
+         value.setPartial(offset, length, true);
+      }
+
+      value.setData(bytes);
+
+      Transaction bdbTx = getBDBTx(tx);
+
+      database.put(bdbTx, key, value);
+   }
+
+   public void remove(BDBJETransaction tx, long id) throws Exception
+   {
+      DatabaseEntry key = createKey(id);
+
+      Transaction bdbTx = getBDBTx(tx);
+
+      database.delete(bdbTx, key);
+   }
+
+   public BDBJECursor cursor() throws Exception
+   {
+      return new RealBDBJECursor();
+   }
+
+   public void close() throws Exception
+   {
+      database.close();
+   }
+   
+   // For testing
+   
+   public byte[] get(long id) throws Exception
+   {
+      DatabaseEntry key = createKey(id);
+      
+      DatabaseEntry data = new DatabaseEntry();
+      
+      if (database.get(null, key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS)
+      {
+         return null;
+      }
+      else
+      {
+         return data.getData();
+      }
+   }
+
+   public long size() throws Exception
+   {
+      return database.count();
+   }
+      
+   // Private ----------------------------------------------------------------------
+   
+   private DatabaseEntry createKey(long id)
+   {
+      DatabaseEntry key = new DatabaseEntry();
+      
+      EntryBinding keyBinding = new LongBinding();
+      
+      keyBinding.objectToEntry(id, key);
+      
+      return key;
+   }
+   
+   private Transaction getBDBTx(BDBJETransaction tx)
+   {
+      Transaction bdbTx = null;
+      
+      if (tx != null)
+      {
+         RealBDBJETransaction bdbJETx = (RealBDBJETransaction)tx;
+         
+         bdbTx = bdbJETx.getTransaction();
+      }
+      
+      return bdbTx;
+   }
+   
+   // Inner classes ---------------------------------------------------------------------
+   
+   private class RealBDBJECursor implements BDBJECursor
+   {
+      private Cursor cursor;
+      
+      RealBDBJECursor() throws Exception
+      {
+         cursor = database.openCursor(null, null);
+      }
+      
+      public Pair<Long, byte[]> getNext() throws Exception
+      {
+         DatabaseEntry key = new DatabaseEntry();
+       
+         DatabaseEntry data = new DatabaseEntry();
+ 
+         if (cursor.getNext(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+         {            
+            long id = LongBinding.entryToLong(key);
+            
+            byte[] bytes = data.getData();
+            
+            Pair<Long, byte[]> pair = new Pair<Long, byte[]>(id, bytes);
+            
+            return pair;            
+         }
+         else
+         {
+            return null;
+         }
+      }
+      
+      public void close() throws Exception
+      {
+         cursor.close();
+      }
+   }
+
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,390 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl.bdbje.integration;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEEnvironment;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJETransaction;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.XAEnvironment;
+
+/**
+ * 
+ * A RealBDBJEEnvironment
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealBDBJEEnvironment implements BDBJEEnvironment
+{
+   /**
+    * The actual DB environment
+    */
+   private XAEnvironment environment;
+
+   /**
+    * The path of the environment
+    */
+   private String environmentPath;
+
+   /**
+    * Is the environment transacted?
+    */
+   private boolean transacted;
+
+   /**
+    * Do we sync OS buffers to disk on transaction commit?
+    */
+   private boolean syncOS;
+
+   /**
+    * Do we sync to the OS on transaction commit ?
+    */
+   private boolean syncVM;     
+
+   /** 
+    * Memory cache size in bytes, or -1 to use BDB default
+    * 
+    */
+   private long memoryCacheSize = -1;
+
+   /**
+    * Are we started?
+    */
+   private volatile boolean started;
+
+   /**
+    * Are we in debug mode? Used in testing
+    */
+   private boolean debug;
+
+   /**
+    * Used for debug only to ensure the XA operations are called in the right order
+    */
+   private Map<Thread, ThreadTXStatus> threadTXStatuses;
+
+   public RealBDBJEEnvironment(boolean debug)
+   {
+      this.debug = debug;
+
+      if (debug)
+      {
+         threadTXStatuses = new ConcurrentHashMap<Thread, ThreadTXStatus>();
+      }
+   }
+
+   public synchronized void start() throws Exception
+   {      
+      if (started)
+      {
+         throw new IllegalStateException("Already started");
+      }
+      if (environmentPath == null)
+      {
+         throw new IllegalStateException("environmentPath has not been specified");
+      }
+
+      EnvironmentConfig envConfig = new EnvironmentConfig();
+
+      if (memoryCacheSize != -1)
+      {
+         envConfig.setCacheSize(memoryCacheSize);
+      }
+
+      envConfig.setTxnNoSync(!syncOS);
+
+      envConfig.setTxnWriteNoSync(!syncVM);
+
+      envConfig.setAllowCreate(true);
+
+      envConfig.setTransactional(transacted);
+
+      environment = new XAEnvironment(new File(environmentPath), envConfig);
+
+      DatabaseConfig dbConfig = new DatabaseConfig();
+
+      dbConfig.setTransactional(transacted);
+
+      dbConfig.setAllowCreate(true);
+
+      started = true;      
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      if (!started)
+      {
+         throw new IllegalStateException("Not started");
+      }
+
+      try
+      {
+         environment.close();
+      }
+      catch (Exception ignore)
+      {
+         //Environment close might fail since there are open transactions - this is ok
+      }
+
+      started = false;      
+   }
+
+   public BDBJETransaction createTransaction() throws Exception
+   {
+      return new RealBDBJETransaction(environment.beginTransaction(null, null));
+   }
+
+   public BDBJEDatabase getDatabase(String databaseName) throws Exception
+   {
+      DatabaseConfig dbConfig = new DatabaseConfig();
+
+      dbConfig.setTransactional(transacted);
+
+      dbConfig.setAllowCreate(true);
+
+      Database database = environment.openDatabase(null, databaseName, dbConfig); 
+
+      BDBJEDatabase db = new RealBDBJEDatabase(database);
+
+      return db;
+   }
+
+   public String getEnvironmentPath()
+   {
+      return this.environmentPath;
+   }
+
+   public long getMemoryCacheSize()
+   {
+      return this.memoryCacheSize;
+   }
+
+   public boolean isSyncOS()
+   {
+      return this.syncOS;
+   }
+
+   public boolean isSyncVM()
+   {
+      return this.syncVM;
+   }
+
+   public boolean isTransacted()
+   {
+      return this.transacted;
+   }
+
+   public void setEnvironmentPath(String environmentPath)
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Cannot set EnvironmentPath when started");
+      }
+      this.environmentPath = environmentPath;
+   }
+
+   public void setMemoryCacheSize(long size)
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Cannot set MemoryCacheSize when started");
+      }
+      this.memoryCacheSize = size;
+   }
+
+   public void setSyncOS(boolean sync)
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Cannot set SyncOS when started");
+      }
+      this.syncOS = sync;
+   }
+
+   public void setSyncVM(boolean sync)
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Cannot set SyncVM when started");
+      }
+      this.syncVM = sync;
+   }
+
+   public void setTransacted(boolean transacted)
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Cannot set Transacted when started");
+      }
+      this.transacted = transacted;
+   }
+
+   public List<Xid> getInDoubtXids() throws Exception
+   {
+      Xid[] xids = environment.recover(XAResource.TMSTARTRSCAN);
+
+      List<Xid> list = Arrays.asList(xids);
+
+      return list;
+   }
+
+   public void startWork(Xid xid) throws Exception
+   {
+      if (debug)
+      {
+         checkNoStatus();
+
+         threadTXStatuses.put(Thread.currentThread(), new ThreadTXStatus(xid));
+      }
+
+      environment.start(xid, XAResource.TMNOFLAGS);
+   }
+
+   public void endWork(Xid xid, boolean failed) throws Exception
+   {
+      if (debug)
+      {
+         checkXAState(xid, XAState.IN_WORK);
+
+         setXAState(XAState.DONE_WORK);
+      }
+
+      environment.end(xid, failed ? XAResource.TMFAIL : XAResource.TMSUCCESS);
+   }
+     
+   public void prepare(Xid xid) throws Exception
+   {
+      if (debug)
+      {
+         checkXAState(xid, XAState.DONE_WORK);
+
+         setXAState(XAState.PREPARE_CALLED);
+      }
+
+      environment.prepare(xid);
+   }
+
+   public void commit(Xid xid) throws Exception
+   {
+      if (debug)
+      {
+         checkXAState(xid, XAState.PREPARE_CALLED);
+
+         threadTXStatuses.remove(Thread.currentThread());
+      }
+
+      environment.commit(xid, false);       
+   }   
+
+   public void rollback(Xid xid) throws Exception
+   {
+      if (debug)
+      {
+         checkXAState(xid, XAState.PREPARE_CALLED);
+
+         threadTXStatuses.remove(Thread.currentThread());
+      }
+
+      environment.rollback(xid);
+   }
+
+   // Private -------------------------------------------------------------------------
+
+   /*
+    * Used for debug only
+    */
+   private ThreadTXStatus getTxStatus()
+   {
+      return threadTXStatuses.get(Thread.currentThread());
+   }
+   
+   private void checkXAState(Xid xid, XAState state)
+   {
+      ThreadTXStatus status = getTxStatus();
+
+      if (status == null)
+      {
+         throw new IllegalStateException("Not started any xa work");
+      }
+      
+      if (!state.equals(status.state))
+      {
+         throw new IllegalStateException("Invalid XAState expected " + state + " got " + status.state);
+      }
+      
+      if (xid != status.implicitXid)
+      {
+         throw new IllegalStateException("Wrong xid");
+      }
+   }
+   
+   private void checkNoStatus()
+   {
+      ThreadTXStatus status = getTxStatus();
+
+      if (status != null)
+      {
+         throw new IllegalStateException("XA status should not exist");
+      }
+   }
+   
+   private void setXAState(XAState state)
+   {
+      threadTXStatuses.get(Thread.currentThread()).state = state;
+   }
+   
+
+   // Inner classes --------------------------------------------------------------------
+
+   private enum XAState
+   {
+      NOT_STARTED, IN_WORK, DONE_WORK, PREPARE_CALLED
+   }
+   
+   /*
+    * Used for debug only
+    */
+   private class ThreadTXStatus
+   {
+      ThreadTXStatus(Xid xid)
+      {
+         this.implicitXid = xid;      
+      }
+
+      Xid implicitXid;
+
+      XAState state = XAState.IN_WORK;
+   }
+
+}

Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java	2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,38 @@
+package org.jboss.messaging.newcore.impl.bdbje.integration;
+
+import org.jboss.messaging.newcore.impl.bdbje.BDBJETransaction;
+
+import com.sleepycat.je.Transaction;
+
+/**
+ * 
+ * A RealBDBJETransaction
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealBDBJETransaction implements BDBJETransaction
+{
+   private Transaction transaction;
+   
+   RealBDBJETransaction(Transaction transaction)
+   {
+      this.transaction = transaction;
+   }
+      
+   public void commit() throws Exception
+   {
+      transaction.commit();
+   }
+
+   public void rollback() throws Exception
+   {
+      transaction.abort();
+   }
+   
+   public Transaction getTransaction()
+   {
+      return transaction;
+   }
+
+}




More information about the jboss-cvs-commits mailing list