[jboss-cvs] JBoss Messaging SVN: r3415 - in branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore: impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 5 14:46:16 EST 2007
Author: timfox
Date: 2007-12-05 14:46:16 -0500 (Wed, 05 Dec 2007)
New Revision: 3415
Added:
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java
Log:
Missing files
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,420 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.util.StreamUtils;
+
+/**
+ * A concrete implementation of a message
+ *
+ * All messages handled by JBM servers are of this type
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2740 $</tt>
+ *
+ * Note this class is only serializable so messages can be returned from JMX operations
+ * e.g. listAllMessages.
+ *
+ * For normal message transportation serialization is not used
+ *
+ * $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
+ */
+public class MessageImpl implements Message
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(MessageImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private boolean trace = log.isTraceEnabled();
+
+ private long messageID;
+
+ private int type;
+
+ private boolean reliable;
+
+ /** GMT milliseconds at which this message expires. 0 means never expires * */
+ private long expiration;
+
+ private long timestamp;
+
+ private Map<String, Object> headers;
+
+ private byte priority;
+
+ //The payload of MessageImpl instances is opaque
+ private byte[] payload;
+
+ //We keep track of the persisted references for this message
+ private transient List<MessageReference> references = new ArrayList<MessageReference>();
+
+ private String destination;
+
+ private String connectionID;
+
+ // Constructors --------------------------------------------------
+
+ /*
+ * Construct a message for deserialization or streaming
+ */
+ public MessageImpl()
+ {
+ }
+
+ public MessageImpl(long messageID, int type, boolean reliable, long expiration,
+ long timestamp, byte priority)
+ {
+ this.messageID = messageID;
+ this.reliable = reliable;
+ this.expiration = expiration;
+ this.timestamp = timestamp;
+ this.priority = priority;
+
+ this.headers = new HashMap<String, Object>();
+ }
+
+ /*
+ * Construct a MessageImpl from storage
+ */
+ public MessageImpl(long messageID, int type, boolean reliable, long expiration,
+ long timestamp, byte priority, byte[] headers, byte[] payload)
+ throws Exception
+ {
+ this.messageID = messageID;
+ this.reliable = reliable;
+ this.expiration = expiration;
+ this.timestamp = timestamp;
+ this.priority = priority;
+
+ if (headers == null)
+ {
+ this.headers = new HashMap<String, Object>();
+ }
+ else
+ {
+ //TODO keep headers opaque on server
+ ByteArrayInputStream bis = new ByteArrayInputStream(headers);
+
+ DataInputStream dais = new DataInputStream(bis);
+
+ this.headers = StreamUtils.readMap(dais, true);
+
+ dais.close();
+ }
+ this.payload = payload;
+ }
+
+ /**
+ * Copy constructor
+ *
+ * @param other
+ */
+ public MessageImpl(MessageImpl other)
+ {
+ this.messageID = other.messageID;
+ this.reliable = other.reliable;
+ this.expiration = other.expiration;
+ this.timestamp = other.timestamp;
+ this.priority = other.priority;
+ this.headers = new HashMap<String, Object>(other.headers);
+ this.payload = other.payload;
+ }
+
+ // Message implementation ----------------------------------------
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public void setMessageID(long id)
+ {
+ this.messageID = id;
+ }
+
+ public String getDestination()
+ {
+ return destination;
+ }
+
+ public void setDestination(String destination)
+ {
+ this.destination = destination;
+ }
+
+ public int getType()
+ {
+ return type;
+ }
+
+ public boolean isReliable()
+ {
+ return reliable;
+ }
+
+ public void setReliable(boolean reliable)
+ {
+ this.reliable = reliable;
+ }
+
+ public long getExpiration()
+ {
+ return expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ this.expiration = expiration;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public Object putHeader(String name, Object value)
+ {
+ return headers.put(name, value);
+ }
+
+ public Object getHeader(String name)
+ {
+ return headers.get(name);
+ }
+
+ public Object removeHeader(String name)
+ {
+ return headers.remove(name);
+ }
+
+ public boolean containsHeader(String name)
+ {
+ return headers.containsKey(name);
+ }
+
+ public Map<String, Object> getHeaders()
+ {
+ return headers;
+ }
+
+ public byte getPriority()
+ {
+ return priority;
+ }
+
+ public void setPriority(byte priority)
+ {
+ this.priority = priority;
+ }
+
+ // TODO - combine with getPayloadAsByteArray to get one big blob
+ public byte[] getHeadersAsByteArray() throws Exception
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+
+ DataOutputStream oos = new DataOutputStream(bos);
+
+ StreamUtils.writeMap(oos, headers, true);
+
+ oos.close();
+
+ return bos.toByteArray();
+ }
+
+ public byte[] getPayload()
+ {
+ return payload;
+ }
+
+ public void setPayload(byte[] payload)
+ {
+ this.payload = payload;
+ }
+
+ public String getConnectionID()
+ {
+ return connectionID;
+ }
+
+ public void setConnectionID(String connectionID)
+ {
+ this.connectionID = connectionID;
+ }
+
+ public boolean isExpired()
+ {
+ if (expiration == 0)
+ {
+ return false;
+ }
+
+ long overtime = System.currentTimeMillis() - expiration;
+
+ if (overtime >= 0)
+ {
+ return true;
+ }
+ return false;
+ }
+
+ public MessageReference createReference(Queue queue)
+ {
+ MessageReference ref = new MessageReferenceImpl(this, queue);
+
+ references.add(ref);
+
+ return ref;
+ }
+
+ public List<MessageReference> getReferences()
+ {
+ return references;
+ }
+
+ public Message copy()
+ {
+ return new MessageImpl(this);
+ }
+
+ // Public --------------------------------------------------------
+
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (!(o instanceof MessageImpl))
+ {
+ return false;
+ }
+
+ MessageImpl that = (MessageImpl) o;
+
+ return that.messageID == this.messageID;
+ }
+
+ public int hashCode()
+ {
+ return (int) ((this.messageID >>> 32) ^ this.messageID);
+ }
+
+ public String toString()
+ {
+ return "M[" + messageID + "]";
+ }
+
+ // Streamable implementation ---------------------------------
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeLong(messageID);
+
+ out.writeUTF(destination);
+
+ out.writeInt(type);
+
+ out.writeBoolean(reliable);
+
+ out.writeLong(expiration);
+
+ out.writeLong(timestamp);
+
+ StreamUtils.writeMap(out, headers, true);
+
+ out.writeByte(priority);
+
+ if (payload != null)
+ {
+ out.writeInt(payload.length);
+
+ out.write(payload);
+ }
+ else
+ {
+ out.writeInt(0);
+ }
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ messageID = in.readLong();
+
+ destination = in.readUTF();
+
+ type = in.readInt();
+
+ reliable = in.readBoolean();
+
+ expiration = in.readLong();
+
+ timestamp = in.readLong();
+
+ headers = StreamUtils.readMap(in, true);
+
+ priority = in.readByte();
+
+ int length = in.readInt();
+
+ if (length == 0)
+ {
+ // no payload
+ payload = null;
+ }
+ else
+ {
+ payload = new byte[length];
+
+ in.readFully(payload);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageReferenceImpl.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,132 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+
+/**
+ * Implementation of a MessageReference
+ *
+ * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @version <tt>1.3</tt>
+ *
+ * MessageReferenceImpl.java,v 1.3 2006/02/23 17:45:57 timfox Exp
+ */
+public class MessageReferenceImpl implements MessageReference
+{
+ private static final Logger log = Logger.getLogger(MessageReferenceImpl.class);
+
+ // Attributes ----------------------------------------------------
+
+ private boolean trace = log.isTraceEnabled();
+
+ private int deliveryCount;
+
+ private long scheduledDeliveryTime;
+
+ private Message message;
+
+ private Queue queue;
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * Required by externalization.
+ */
+ public MessageReferenceImpl()
+ {
+ if (trace) { log.trace("Creating using default constructor"); }
+ }
+
+ public MessageReferenceImpl(MessageReferenceImpl other, Queue queue)
+ {
+ this.deliveryCount = other.deliveryCount;
+
+ this.scheduledDeliveryTime = other.scheduledDeliveryTime;
+
+ this.message = other.message;
+
+ this.queue = queue;
+ }
+
+ protected MessageReferenceImpl(Message message, Queue queue)
+ {
+ this.message = message;
+
+ this.queue = queue;
+ }
+
+ // MessageReference implementation -------------------------------
+
+ public MessageReference copy(Queue queue)
+ {
+ return new MessageReferenceImpl(this, queue);
+ }
+
+ public int getDeliveryCount()
+ {
+ return deliveryCount;
+ }
+
+ public void setDeliveryCount(int deliveryCount)
+ {
+ this.deliveryCount = deliveryCount;
+ }
+
+ public long getScheduledDeliveryTime()
+ {
+ return scheduledDeliveryTime;
+ }
+
+ public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ {
+ this.scheduledDeliveryTime = scheduledDeliveryTime;
+ }
+
+ public Message getMessage()
+ {
+ return message;
+ }
+
+ public Queue getQueue()
+ {
+ return queue;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String toString()
+ {
+ return "Reference[" + getMessage().getMessageID() + "]:" + (getMessage().isReliable() ? "RELIABLE" : "NON-RELIABLE");
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/QueueImpl.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,508 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+
+import org.jboss.jms.server.MessagingTimeoutFactory;
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Consumer;
+import org.jboss.messaging.newcore.intf.DistributionPolicy;
+import org.jboss.messaging.newcore.intf.Filter;
+import org.jboss.messaging.newcore.intf.HandleStatus;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
+import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedListImpl;
+import org.jboss.util.timeout.Timeout;
+import org.jboss.util.timeout.TimeoutTarget;
+
+/**
+ *
+ * A standard non clustered Queue implementation
+ *
+ * TODO use Java 5 concurrent queue
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class QueueImpl implements Queue
+{
+ private static final Logger log = Logger.getLogger(QueueImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ protected long id;
+
+ protected int maxSize = -1;
+
+ protected Filter filter;
+
+ protected PriorityLinkedList<MessageReference> messageReferences;
+
+ protected List<Consumer> consumers;
+
+ protected Set<Timeout> scheduledTimeouts;
+
+ protected DistributionPolicy distributionPolicy;
+
+ protected boolean direct;
+
+ protected boolean promptDelivery;
+
+ private int pos;
+
+ public QueueImpl(long id)
+ {
+ this(id, null);
+ }
+
+ public QueueImpl(long id, Filter filter)
+ {
+ this.id = id;
+
+ this.filter = filter;
+
+ //TODO - use a wait free concurrent queue
+ messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
+
+ consumers = new ArrayList<Consumer>();
+
+ scheduledTimeouts = new HashSet<Timeout>();
+
+ distributionPolicy = new RoundRobinDistributionPolicy();
+
+ direct = true;
+ }
+
+ public QueueImpl(long id, Filter filter, int maxSize)
+ {
+ this(id, filter);
+
+ this.maxSize = maxSize;
+ }
+
+ // Queue implementation -------------------------------------------------------------------
+
+ public synchronized HandleStatus addLast(MessageReference ref)
+ {
+ return add(ref, false);
+ }
+
+ public synchronized HandleStatus addFirst(MessageReference ref)
+ {
+ return add(ref, true);
+ }
+
+
+ /*
+ * Attempt to deliver all the messages in the queue
+ * @see org.jboss.messaging.newcore.intf.Queue#deliver()
+ */
+ public synchronized void deliver()
+ {
+ MessageReference reference;
+
+ ListIterator<MessageReference> iterator = null;
+
+ while (true)
+ {
+ if (iterator == null)
+ {
+ reference = messageReferences.peekFirst();
+ }
+ else
+ {
+ if (iterator.hasNext())
+ {
+ reference = iterator.next();
+ }
+ else
+ {
+ reference = null;
+ }
+ }
+
+ if (reference == null)
+ {
+ if (iterator == null)
+ {
+ //We delivered all the messages - go into direct delivery
+ direct = true;
+ }
+ return;
+ }
+
+ HandleStatus status = deliver(reference);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ if (iterator == null)
+ {
+ messageReferences.removeFirst();
+ }
+ else
+ {
+ iterator.remove();
+ }
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ //All consumers busy - give up
+ break;
+ }
+ else if (status == HandleStatus.NO_MATCH && iterator == null)
+ {
+ //Consumers not all busy - but filter not accepting - iterate back through the queue
+ iterator = messageReferences.iterator();
+ }
+ }
+ }
+
+ public synchronized void addConsumer(Consumer consumer)
+ {
+ consumers.add(consumer);
+ }
+
+ public synchronized boolean removeConsumer(Consumer consumer)
+ {
+ boolean removed = consumers.remove(consumer);
+
+ if (pos == consumers.size())
+ {
+ pos = 0;
+ }
+
+ if (consumers.isEmpty())
+ {
+ promptDelivery = false;
+ }
+
+ return removed;
+ }
+
+ public synchronized boolean deliverScheduled(MessageReference reference)
+ {
+ //TODO
+
+ return false;
+ }
+
+ public synchronized int getConsumerCount()
+ {
+ return consumers.size();
+ }
+
+ public synchronized List<MessageReference> list(Filter filter)
+ {
+ if (filter == null)
+ {
+ return new ArrayList<MessageReference>(messageReferences.getAll());
+ }
+ else
+ {
+ ArrayList<MessageReference> list = new ArrayList<MessageReference>();
+
+ for (MessageReference ref: messageReferences.getAll())
+ {
+ if (filter.match(ref.getMessage()))
+ {
+ list.add(ref);
+ }
+ }
+
+ return list;
+ }
+ }
+
+ public synchronized void removeAllReferences()
+ {
+ messageReferences.clear();
+
+ if (!this.scheduledTimeouts.isEmpty())
+ {
+ Set<Timeout> clone = new HashSet<Timeout>(scheduledTimeouts);
+
+ for (Timeout timeout: clone)
+ {
+ timeout.cancel();
+ }
+
+ scheduledTimeouts.clear();
+ }
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public synchronized Filter getFilter()
+ {
+ return filter;
+ }
+
+ public synchronized void setFilter(Filter filter)
+ {
+ this.filter = filter;
+ }
+
+ public synchronized int getMessageCount()
+ {
+ return messageReferences.size();
+ }
+
+ public synchronized int getScheduledCount()
+ {
+ return scheduledTimeouts.size();
+ }
+
+ public synchronized int getMaxSize()
+ {
+ return maxSize;
+ }
+
+ public synchronized void setMaxSize(int maxSize)
+ {
+ int num = messageReferences.size() + scheduledTimeouts.size();
+
+ if (maxSize < num)
+ {
+ throw new IllegalArgumentException("Cannot set maxSize to " + maxSize + " since there are " + num + " refs");
+ }
+ this.maxSize = maxSize;
+ }
+
+ public synchronized DistributionPolicy getDistributionPolicy()
+ {
+ return distributionPolicy;
+ }
+
+ public synchronized void setDistributionPolicy(DistributionPolicy distributionPolicy)
+ {
+ this.distributionPolicy = distributionPolicy;
+ }
+
+ // Private ------------------------------------------------------------------------------
+
+ private HandleStatus add(MessageReference ref, boolean first)
+ {
+ if (!checkFull())
+ {
+ return HandleStatus.BUSY;
+ }
+
+ if (filter != null)
+ {
+ if (!filter.match(ref.getMessage()))
+ {
+ return HandleStatus.NO_MATCH;
+ }
+ }
+
+ if (!checkAndSchedule(ref))
+ {
+ boolean add = false;
+
+ if (direct)
+ {
+ //Deliver directly
+
+ HandleStatus status = deliver(ref);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ //Ok
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ add = true;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ add = true;
+
+ promptDelivery = true;
+ }
+
+ if (add)
+ {
+ direct = false;
+ }
+ }
+ else
+ {
+ add = true;
+ }
+
+ if (add)
+ {
+ if (first)
+ {
+ messageReferences.addFirst(ref, ref.getMessage().getPriority());
+ }
+ else
+ {
+ messageReferences.addLast(ref, ref.getMessage().getPriority());
+ }
+
+ if (!direct && promptDelivery)
+ {
+ //We have consumers with filters which don't match, so we need to prompt delivery every time
+ //a new message arrives - this is why you really shouldn't use filters with queues - in most cases
+ //it's an ant-pattern since it would cause a queue scan on each message
+ deliver();
+ }
+ }
+ }
+
+ return HandleStatus.HANDLED;
+ }
+
+ private boolean checkAndSchedule(MessageReference ref)
+ {
+ if (ref.getScheduledDeliveryTime() > System.currentTimeMillis())
+ {
+ if (trace) { log.trace("Scheduling delivery for " + ref + " to occur at " + ref.getScheduledDeliveryTime()); }
+
+ // Schedule the cancel to actually occur at the specified time.
+
+ Timeout timeout =
+ MessagingTimeoutFactory.instance.getFactory().
+ schedule(ref.getScheduledDeliveryTime(), new DeliverRefTimeoutTarget(ref));
+
+ scheduledTimeouts.add(timeout);
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private boolean checkFull()
+ {
+ if (maxSize != -1 && (messageReferences.size() + scheduledTimeouts.size()) >= maxSize)
+ {
+ if (trace) { log.trace(this + " queue is full, rejecting message"); }
+
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ private HandleStatus deliver(MessageReference reference)
+ {
+ if (consumers.isEmpty())
+ {
+ return HandleStatus.BUSY;
+ }
+
+ int startPos = pos;
+
+ boolean filterRejected = false;
+
+ while (true)
+ {
+ Consumer consumer = consumers.get(pos);
+
+ pos = distributionPolicy.select(consumers, pos);
+
+ HandleStatus status = consumer.handle(reference);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ return HandleStatus.HANDLED;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ filterRejected = true;
+ }
+
+ if (pos == startPos)
+ {
+ //Tried all of them
+ if (filterRejected)
+ {
+ return HandleStatus.NO_MATCH;
+ }
+ else
+ {
+ //Give up - all consumers busy
+ return HandleStatus.BUSY;
+ }
+ }
+ }
+ }
+
+ // Inner classes --------------------------------------------------------------------------
+
+ private class DeliverRefTimeoutTarget implements TimeoutTarget
+ {
+ private MessageReference ref;
+
+ public DeliverRefTimeoutTarget(MessageReference ref)
+ {
+ this.ref = ref;
+ }
+
+ public void timedOut(Timeout timeout)
+ {
+ if (trace) { log.trace("Scheduled delivery timeout " + ref); }
+
+ synchronized (scheduledTimeouts)
+ {
+ boolean removed = scheduledTimeouts.remove(timeout);
+
+ if (!removed)
+ {
+ throw new IllegalStateException("Failed to remove timeout " + timeout);
+ }
+ }
+
+ ref.setScheduledDeliveryTime(0);
+
+ HandleStatus status = deliver(ref);
+
+ if (HandleStatus.HANDLED != status)
+ {
+ //Add back to the front of the queue
+
+ addFirst(ref);
+ }
+ else
+ {
+ if (trace) { log.trace("Delivered scheduled delivery at " + System.currentTimeMillis() + " for " + ref); }
+ }
+ }
+ }
+
+
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/RoundRobinDistributionPolicy.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.util.List;
+
+import org.jboss.messaging.newcore.intf.Consumer;
+import org.jboss.messaging.newcore.intf.DistributionPolicy;
+
+/**
+ *
+ * A RoundRobinDistributionPolicy
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RoundRobinDistributionPolicy implements DistributionPolicy
+{
+ public int select(List<Consumer> consumers, int pos)
+ {
+ if (pos == -1)
+ {
+ //First time
+ pos = 0;
+ }
+ else
+ {
+ pos++;
+
+ if (pos == consumers.size())
+ {
+ pos = 0;
+ }
+ }
+ return pos;
+ }
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,214 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.PersistenceManager;
+import org.jboss.messaging.newcore.intf.Transaction;
+import org.jboss.messaging.newcore.intf.TransactionSynchronization;
+
+/**
+ *
+ * A TransactionImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TransactionImpl implements Transaction
+{
+ private static final Logger log = Logger.getLogger(TransactionImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private List<MessageReference> refsToAdd;
+
+ private List<MessageReference> refsToRemove;
+
+ private List<TransactionSynchronization> synchronizations = new ArrayList<TransactionSynchronization>();
+
+ private Xid xid;
+
+ private boolean containsPersistent;
+
+ public TransactionImpl(List<MessageReference> refsToAdd, List<MessageReference> refsToRemove,
+ boolean containsPersistent)
+ {
+ this.refsToAdd = refsToAdd;
+
+ this.refsToRemove = refsToRemove;
+
+ this.containsPersistent = containsPersistent;
+ }
+
+ // Transaction implementation -----------------------------------------------------------
+
+ public void addSynchronization(TransactionSynchronization sync)
+ {
+ synchronizations.add(sync);
+ }
+
+ public void prepare(PersistenceManager persistenceManager)
+ {
+ if (xid == null)
+ {
+ throw new IllegalStateException("Cannot call prepare() on a non XA transaction");
+ }
+ else
+ {
+
+ }
+ }
+
+ public void commit(PersistenceManager persistenceManager) throws Exception
+ {
+ callSynchronizations(SyncType.BEFORE_COMMIT);
+
+ if (containsPersistent)
+ {
+ if (xid == null)
+ {
+ //1PC commit
+ PersistenceTransaction tx = null;
+
+ try
+ {
+ tx = persistenceManager.createTransaction(false);
+
+ playOperations(tx, persistenceManager);
+
+ tx.commit();
+
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ tx.rollback();
+ }
+ catch (Throwable t)
+ {
+ if (trace) { log.trace("Failed to rollback", t); }
+ }
+ throw e;
+ }
+ }
+ else
+ {
+ //2PC commit
+
+ PersistenceTransaction tx = null;
+
+ tx = persistenceManager.getTransaction(xid);
+
+ playOperations(tx, persistenceManager);
+
+ tx.commit();
+
+
+ }
+ }
+
+
+ //Now add to queue(s)
+
+ for (MessageReference reference: refsToAdd)
+ {
+ reference.getQueue().addLast(reference);
+ }
+
+ callSynchronizations(SyncType.AFTER_COMMIT);
+ }
+
+ public void rollback(PersistenceManager persistenceManager) throws Exception
+ {
+ callSynchronizations(SyncType.BEFORE_ROLLBACK);
+
+ if (xid == null)
+ {
+ //1PC rollback - nothing to do
+ }
+ else
+ {
+
+ }
+ callSynchronizations(SyncType.AFTER_ROLLBACK);
+
+ }
+
+ // Private -------------------------------------------------------------------
+
+ private void playOperations(PersistenceTransaction tx, PersistenceManager persistenceManager) throws Exception
+ {
+ for (MessageReference reference: refsToAdd)
+ {
+ if (reference.getMessage().isReliable())
+ {
+ persistenceManager.addReference(tx, reference.getQueue(), reference);
+ }
+ }
+
+ for (MessageReference reference: refsToRemove)
+ {
+ if (reference.getMessage().isReliable())
+ {
+ persistenceManager.removeReference(tx, reference.getQueue(), reference);
+ }
+ }
+ }
+
+ private void callSynchronizations(SyncType type) throws Exception
+ {
+ for (TransactionSynchronization sync: synchronizations)
+ {
+ if (type == SyncType.BEFORE_COMMIT)
+ {
+ sync.beforeCommit();
+ }
+ else if (type == SyncType.AFTER_COMMIT)
+ {
+ sync.afterCommit();
+ }
+ else if (type == SyncType.BEFORE_ROLLBACK)
+ {
+ sync.beforeRollback();
+ }
+ else if (type == SyncType.AFTER_ROLLBACK);
+ {
+ sync.afterRollback();
+ }
+ }
+ }
+
+ // Inner Enums -------------------------------------------------------------------------------
+
+ private enum SyncType
+ {
+ BEFORE_COMMIT, AFTER_COMMIT, BEFORE_ROLLBACK, AFTER_ROLLBACK;
+ }
+
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJECursor.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl.bdbje;
+
+import org.jboss.messaging.util.Pair;
+
+/**
+ *
+ * A CursorIntf
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJECursor
+{
+ Pair<Long, byte[]> getNext() throws Exception;
+
+ void close() throws Exception;
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEDatabase.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,26 @@
+package org.jboss.messaging.newcore.impl.bdbje;
+
+
+/**
+ *
+ * A BDBJEDatabase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJEDatabase
+{
+ void put(BDBJETransaction tx, long id, byte[] bytes, int offset, int length) throws Exception;
+
+ void remove(BDBJETransaction tx, long id) throws Exception;
+
+ void close() throws Exception;
+
+ BDBJECursor cursor() throws Exception;
+
+ //Only used for testing
+
+ long size() throws Exception;
+
+ byte[] get(long id) throws Exception;
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEEnvironment.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,55 @@
+package org.jboss.messaging.newcore.impl.bdbje;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+/**
+ *
+ * A BDBJEEnvironment
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJEEnvironment
+{
+ void start() throws Exception;
+
+ void stop() throws Exception;
+
+ BDBJETransaction createTransaction() throws Exception;
+
+ BDBJEDatabase getDatabase(String databaseName) throws Exception;
+
+ void setEnvironmentPath(String environmentPath);
+
+ String getEnvironmentPath();
+
+ void setTransacted(boolean transacted);
+
+ boolean isTransacted();
+
+ void setSyncVM(boolean sync);
+
+ boolean isSyncVM();
+
+ void setSyncOS(boolean sync);
+
+ boolean isSyncOS();
+
+ void setMemoryCacheSize(long size);
+
+ long getMemoryCacheSize();
+
+ void startWork(Xid xid) throws Exception;
+
+ void endWork(Xid xid, boolean fail) throws Exception;
+
+ void prepare(Xid xid) throws Exception;
+
+ void commit(Xid xid) throws Exception;
+
+ void rollback(Xid xid) throws Exception;
+
+ List<Xid> getInDoubtXids() throws Exception;
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,545 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl.bdbje;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.impl.MessageImpl;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.PersistenceManager;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.util.Pair;
+
+import com.sleepycat.je.DatabaseEntry;
+
+/**
+ *
+ * A PersistenceManager implementation that stores messages using Berkeley DB Java Edition.
+ *
+ * We store message data in one BDB JE Database and MessageReference data in another.
+ *
+ * Both Database instances are in the same BDB JE Environment. All database in a single environment
+ * share the same log file and can participate in the same transaction.
+ *
+ * We store MessageReference data in a different Database since currently BDB JE is not optimised for partial
+ * updates - this means if we have a large message, then for every MessageReference we would have to update
+ * and store the entire message again - once for each reference as they are delivered / acknowldeged - this
+ * can give very poor performance.
+ *
+ * TODO - Optimisation - If there is only one MessageReference per Message then we can store it in the same database
+ *
+ * For XA functionality we do not write explicit transaction records as it proves somewhat quicker to rely
+ * on BDB JE's inbuilt XA transaction capability.
+ *
+ * If messages are larger than <minLargeMessageSize> the messages are not stored in the BDB log but are stored
+ * as individual files in the <largeMessageRepositoryPath> directory referenced by a pointer from
+ * the log. This is because storing very large messages in a BDB log is not an efficient use of the log.
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class BDBJEPersistenceManager implements PersistenceManager
+{
+ private static final Logger log = Logger.getLogger(BDBJEPersistenceManager.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private static final int SIZE_LONG = 8;
+
+ private static final int SIZE_INT = 4;
+
+ private static final int SIZE_BYTE = 1;
+
+ public static final int SIZE_REF_DATA =
+ SIZE_LONG + SIZE_INT + SIZE_LONG; // queue id + delivery count + scheduled delivery
+
+ // type + expiration + timestamp + priority
+ public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
+
+ private static final byte[] ZERO_LENGTH_BYTE_ARRAY = new byte[0];
+
+ public static final String MESSAGE_DB_NAME = "message";
+
+ public static final String REFERENCE_DB_NAME = "reference";
+
+ private boolean recovery;
+
+ private BDBJEEnvironment environment;
+
+ private BDBJEDatabase messageDB;
+
+ private BDBJEDatabase refDB;
+
+ private String largeMessageRepositoryPath;
+
+ private int minLargeMessageSize;
+
+ private String environmentPath;
+
+ private boolean started;
+
+ public BDBJEPersistenceManager(BDBJEEnvironment environment, String environmentPath)
+ {
+ this.environment = environment;
+
+ this.environmentPath = environmentPath;
+ }
+
+ // MessagingComponent implementation -----------------------------------------------------
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ environment.setEnvironmentPath(environmentPath);
+
+ environment.start();
+
+ messageDB = environment.getDatabase(MESSAGE_DB_NAME);
+
+ refDB = environment.getDatabase(REFERENCE_DB_NAME);
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ messageDB.close();
+
+ refDB.close();
+
+ environment.stop();
+
+ started = false;
+ }
+
+ // PersistenceManager implementation ----------------------------------------------------------
+
+ public void commitMessage(Message message) throws Exception
+ {
+ BDBJETransaction tx = null;
+
+ try
+ {
+ tx = environment.createTransaction();
+
+ internalCommitMessage(tx, message);
+
+ tx.commit();
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.rollback();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to rollback", ignore); }
+ }
+ }
+ }
+
+ public void commitMessages(List<Message> messages)
+ {
+ BDBJETransaction tx = null;
+
+ try
+ {
+ tx = environment.createTransaction();
+
+ for (Message message: messages)
+ {
+ internalCommitMessage(tx, message);
+ }
+
+ tx.commit();
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.rollback();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to rollback", ignore); }
+ }
+ }
+ }
+
+ public void deleteReferences(List<MessageReference> references)
+ {
+ BDBJETransaction tx = null;
+
+ try
+ {
+ tx = environment.createTransaction();
+
+ for (MessageReference ref: references)
+ {
+ internalDeleteReference(tx, ref);
+ }
+
+ tx.commit();
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.rollback();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to rollback", ignore); }
+ }
+ }
+ }
+
+ public void deleteReference(MessageReference reference)
+ {
+ BDBJETransaction tx = null;
+
+ try
+ {
+ tx = environment.createTransaction();
+
+ internalDeleteReference(tx, reference);
+
+ tx.commit();
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ if (tx != null)
+ {
+ tx.rollback();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to rollback", ignore); }
+ }
+ }
+ }
+
+ public List<Xid> getInDoubtXids() throws Exception
+ {
+ return environment.getInDoubtXids();
+ }
+
+ public void setInRecoveryMode(boolean recoveryMode)
+ {
+ this.recovery = recoveryMode;
+ }
+
+ public boolean isInRecoveryMode()
+ {
+ return recovery;
+ }
+
+ public void loadQueues(Map<Long, Queue> queues) throws Exception
+ {
+ BDBJECursor cursorMessage = this.messageDB.cursor();
+
+ BDBJECursor cursorRef = this.refDB.cursor();
+
+ Pair<Long, byte[]> messagePair;
+
+ Pair<Long, byte[]> refPair;
+
+ while ((messagePair = cursorMessage.getNext()) != null)
+ {
+ refPair = cursorRef.getNext();
+
+ if (refPair == null)
+ {
+ throw new IllegalStateException("Message and ref data out of sync");
+ }
+
+ long id = messagePair.a;
+
+ byte[] bytes = messagePair.b;
+
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+ int type = buffer.getInt();
+
+ long expiration = buffer.getLong();
+
+ long timestamp = buffer.getLong();
+
+ byte priority = buffer.get();
+
+ int headerSize = buffer.getInt();
+
+ //TODO we can optimise this to prevent a copy - let the message use a window on the byte[]
+
+ byte[] headers = new byte[headerSize];
+
+ buffer.get(headers);
+
+ int payloadSize = buffer.getInt();
+
+ byte[] payload = new byte[payloadSize];
+
+ buffer.get(payload);
+
+ Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
+ headers, payload);
+
+ //Now the ref data
+
+ byte[] refBytes = refPair.b;
+
+ buffer = ByteBuffer.wrap(refBytes);
+
+ while (buffer.hasRemaining())
+ {
+ long queueID = buffer.getLong();
+
+ int deliveryCount = buffer.getInt();
+
+ long scheduledDeliveryTime = buffer.getLong();
+
+ Queue queue = queues.get(queueID);
+
+ if (queue == null)
+ {
+ //Ok - queue is not deployed
+ }
+ else
+ {
+ MessageReference reference = message.createReference(queue);
+
+ reference.setDeliveryCount(deliveryCount);
+
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+
+ queue.addLast(reference);
+ }
+ }
+ }
+ }
+
+ public void prepareMessages(Xid xid, List<Message> messages) throws Exception
+ {
+ environment.startWork(xid);
+
+ try
+ {
+ for (Message message: messages)
+ {
+ internalCommitMessage(null, message);
+ }
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ environment.endWork(xid, true);
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to end", ignore); }
+ }
+
+ throw e;
+ }
+
+ environment.endWork(xid, false);
+
+ environment.prepare(xid);
+ }
+
+ public void commitPreparedMessages(Xid xid) throws Exception
+ {
+ environment.commit(xid);
+ }
+
+ public void unprepareMessages(Xid xid, List<Message> messages) throws Exception
+ {
+ environment.rollback(xid);
+ }
+
+ public void updateDeliveryCount(Queue queue, MessageReference ref) throws Exception
+ {
+ //TODO - optimise this scan
+
+ int pos = ref.getMessage().getReferences().indexOf(ref);
+
+ int offset = pos * SIZE_REF_DATA + SIZE_LONG;
+
+ byte[] bytes = new byte[SIZE_INT];
+
+ ByteBuffer buff = ByteBuffer.wrap(bytes);
+
+ buff.putInt(ref.getDeliveryCount());
+
+ refDB.put(null, ref.getMessage().getMessageID(), bytes, offset, SIZE_INT);
+ }
+
+ // Public ----------------------------------------------------------------------------------
+
+ public String getLargeMessageRepositoryPath()
+ {
+ return largeMessageRepositoryPath;
+ }
+
+ public void setLargeMessageRepository(String largeMessageRepositoryPath)
+ {
+ this.largeMessageRepositoryPath = largeMessageRepositoryPath;
+ }
+
+ public int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ public void setMinLargeMessageSize(int minLargeMessageSize)
+ {
+ this.minLargeMessageSize = minLargeMessageSize;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public void setStarted(boolean started)
+ {
+ this.started = started;
+ }
+
+ // Private ---------------------------------------------------------------------------------
+
+
+ private void internalCommitMessage(BDBJETransaction tx, Message message) throws Exception
+ {
+ //First store the message
+
+ byte[] headers = message.getHeadersAsByteArray();
+
+ int headersLength = headers.length;
+
+ byte[] payload = message.getPayload();
+
+ int payloadLength = payload == null ? 0 : payload.length;
+
+ //TODO - avoid copying by having message already store it's byte representation or do partial
+ //update in BDB
+ byte[] bytes = new byte[SIZE_FIELDS + 2 * SIZE_INT + headersLength + payloadLength];
+
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+ //Put the fields
+ buffer.putInt(message.getType());
+ buffer.putLong(message.getExpiration());
+ buffer.putLong(message.getTimestamp());
+ buffer.put(message.getPriority());
+
+ buffer.putInt(headersLength);
+ buffer.put(headers);
+
+ buffer.putInt(payloadLength);
+ if (payload != null)
+ {
+ buffer.put(payload);
+ }
+
+ //Now the ref(s)
+
+ byte[] refBytes = new byte[message.getReferences().size() * SIZE_REF_DATA];
+
+ ByteBuffer buff = ByteBuffer.wrap(refBytes);
+
+ for (MessageReference ref: message.getReferences())
+ {
+ buff.putLong(ref.getQueue().getID());
+ buff.putInt(ref.getDeliveryCount());
+ buff.putLong(ref.getScheduledDeliveryTime());
+ }
+
+ try
+ {
+ messageDB.put(tx, message.getMessageID(), bytes, 0, bytes.length);
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ refDB.put(tx, message.getMessageID(), refBytes, 0, refBytes.length);
+ }
+
+ private void internalDeleteReference(BDBJETransaction tx, MessageReference reference) throws Exception
+ {
+ Message message = reference.getMessage();
+
+ boolean deleteAll = message.getReferences().size() == 1;
+
+ if (deleteAll)
+ {
+ refDB.remove(tx, message.getMessageID());
+
+ messageDB.remove(tx, message.getMessageID());
+
+ message.getReferences().remove(0);
+ }
+ else
+ {
+ //TODO - this can be optimised so not to scan using indexOf
+
+ int pos = message.getReferences().indexOf(reference);
+
+ int offset = pos * SIZE_REF_DATA;
+
+ refDB.put(tx, message.getMessageID(), ZERO_LENGTH_BYTE_ARRAY, offset, SIZE_REF_DATA);
+
+ message.getReferences().remove(pos);
+ }
+ }
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJETransaction.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,15 @@
+package org.jboss.messaging.newcore.impl.bdbje;
+
+/**
+ *
+ * A BDBJETransaction
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BDBJETransaction
+{
+ public void commit() throws Exception;
+
+ public void rollback() throws Exception;
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEDatabase.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,161 @@
+package org.jboss.messaging.newcore.impl.bdbje.integration;
+
+import org.jboss.messaging.newcore.impl.bdbje.BDBJECursor;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJETransaction;
+import org.jboss.messaging.util.Pair;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
+/**
+ *
+ * A RealBDBJEDatabase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealBDBJEDatabase implements BDBJEDatabase
+{
+ private Database database;
+
+ RealBDBJEDatabase(Database database)
+ {
+ this.database = database;
+ }
+
+ // BDBJEDatabase implementation ------------------------------------------
+
+ public void put(BDBJETransaction tx, long id, byte[] bytes, int offset, int length) throws Exception
+ {
+ DatabaseEntry key = createKey(id);
+
+ DatabaseEntry value = new DatabaseEntry();
+
+ if (offset != 0)
+ {
+ value.setPartial(offset, length, true);
+ }
+
+ value.setData(bytes);
+
+ Transaction bdbTx = getBDBTx(tx);
+
+ database.put(bdbTx, key, value);
+ }
+
+ public void remove(BDBJETransaction tx, long id) throws Exception
+ {
+ DatabaseEntry key = createKey(id);
+
+ Transaction bdbTx = getBDBTx(tx);
+
+ database.delete(bdbTx, key);
+ }
+
+ public BDBJECursor cursor() throws Exception
+ {
+ return new RealBDBJECursor();
+ }
+
+ public void close() throws Exception
+ {
+ database.close();
+ }
+
+ // For testing
+
+ public byte[] get(long id) throws Exception
+ {
+ DatabaseEntry key = createKey(id);
+
+ DatabaseEntry data = new DatabaseEntry();
+
+ if (database.get(null, key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS)
+ {
+ return null;
+ }
+ else
+ {
+ return data.getData();
+ }
+ }
+
+ public long size() throws Exception
+ {
+ return database.count();
+ }
+
+ // Private ----------------------------------------------------------------------
+
+ private DatabaseEntry createKey(long id)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+
+ EntryBinding keyBinding = new LongBinding();
+
+ keyBinding.objectToEntry(id, key);
+
+ return key;
+ }
+
+ private Transaction getBDBTx(BDBJETransaction tx)
+ {
+ Transaction bdbTx = null;
+
+ if (tx != null)
+ {
+ RealBDBJETransaction bdbJETx = (RealBDBJETransaction)tx;
+
+ bdbTx = bdbJETx.getTransaction();
+ }
+
+ return bdbTx;
+ }
+
+ // Inner classes ---------------------------------------------------------------------
+
+ private class RealBDBJECursor implements BDBJECursor
+ {
+ private Cursor cursor;
+
+ RealBDBJECursor() throws Exception
+ {
+ cursor = database.openCursor(null, null);
+ }
+
+ public Pair<Long, byte[]> getNext() throws Exception
+ {
+ DatabaseEntry key = new DatabaseEntry();
+
+ DatabaseEntry data = new DatabaseEntry();
+
+ if (cursor.getNext(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ long id = LongBinding.entryToLong(key);
+
+ byte[] bytes = data.getData();
+
+ Pair<Long, byte[]> pair = new Pair<Long, byte[]>(id, bytes);
+
+ return pair;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void close() throws Exception
+ {
+ cursor.close();
+ }
+ }
+
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJEEnvironment.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,390 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.newcore.impl.bdbje.integration;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEEnvironment;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJETransaction;
+
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.XAEnvironment;
+
+/**
+ *
+ * A RealBDBJEEnvironment
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealBDBJEEnvironment implements BDBJEEnvironment
+{
+ /**
+ * The actual DB environment
+ */
+ private XAEnvironment environment;
+
+ /**
+ * The path of the environment
+ */
+ private String environmentPath;
+
+ /**
+ * Is the environment transacted?
+ */
+ private boolean transacted;
+
+ /**
+ * Do we sync OS buffers to disk on transaction commit?
+ */
+ private boolean syncOS;
+
+ /**
+ * Do we sync to the OS on transaction commit ?
+ */
+ private boolean syncVM;
+
+ /**
+ * Memory cache size in bytes, or -1 to use BDB default
+ *
+ */
+ private long memoryCacheSize = -1;
+
+ /**
+ * Are we started?
+ */
+ private volatile boolean started;
+
+ /**
+ * Are we in debug mode? Used in testing
+ */
+ private boolean debug;
+
+ /**
+ * Used for debug only to ensure the XA operations are called in the right order
+ */
+ private Map<Thread, ThreadTXStatus> threadTXStatuses;
+
+ public RealBDBJEEnvironment(boolean debug)
+ {
+ this.debug = debug;
+
+ if (debug)
+ {
+ threadTXStatuses = new ConcurrentHashMap<Thread, ThreadTXStatus>();
+ }
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Already started");
+ }
+ if (environmentPath == null)
+ {
+ throw new IllegalStateException("environmentPath has not been specified");
+ }
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+
+ if (memoryCacheSize != -1)
+ {
+ envConfig.setCacheSize(memoryCacheSize);
+ }
+
+ envConfig.setTxnNoSync(!syncOS);
+
+ envConfig.setTxnWriteNoSync(!syncVM);
+
+ envConfig.setAllowCreate(true);
+
+ envConfig.setTransactional(transacted);
+
+ environment = new XAEnvironment(new File(environmentPath), envConfig);
+
+ DatabaseConfig dbConfig = new DatabaseConfig();
+
+ dbConfig.setTransactional(transacted);
+
+ dbConfig.setAllowCreate(true);
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ throw new IllegalStateException("Not started");
+ }
+
+ try
+ {
+ environment.close();
+ }
+ catch (Exception ignore)
+ {
+ //Environment close might fail since there are open transactions - this is ok
+ }
+
+ started = false;
+ }
+
+ public BDBJETransaction createTransaction() throws Exception
+ {
+ return new RealBDBJETransaction(environment.beginTransaction(null, null));
+ }
+
+ public BDBJEDatabase getDatabase(String databaseName) throws Exception
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+
+ dbConfig.setTransactional(transacted);
+
+ dbConfig.setAllowCreate(true);
+
+ Database database = environment.openDatabase(null, databaseName, dbConfig);
+
+ BDBJEDatabase db = new RealBDBJEDatabase(database);
+
+ return db;
+ }
+
+ public String getEnvironmentPath()
+ {
+ return this.environmentPath;
+ }
+
+ public long getMemoryCacheSize()
+ {
+ return this.memoryCacheSize;
+ }
+
+ public boolean isSyncOS()
+ {
+ return this.syncOS;
+ }
+
+ public boolean isSyncVM()
+ {
+ return this.syncVM;
+ }
+
+ public boolean isTransacted()
+ {
+ return this.transacted;
+ }
+
+ public void setEnvironmentPath(String environmentPath)
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Cannot set EnvironmentPath when started");
+ }
+ this.environmentPath = environmentPath;
+ }
+
+ public void setMemoryCacheSize(long size)
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Cannot set MemoryCacheSize when started");
+ }
+ this.memoryCacheSize = size;
+ }
+
+ public void setSyncOS(boolean sync)
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Cannot set SyncOS when started");
+ }
+ this.syncOS = sync;
+ }
+
+ public void setSyncVM(boolean sync)
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Cannot set SyncVM when started");
+ }
+ this.syncVM = sync;
+ }
+
+ public void setTransacted(boolean transacted)
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Cannot set Transacted when started");
+ }
+ this.transacted = transacted;
+ }
+
+ public List<Xid> getInDoubtXids() throws Exception
+ {
+ Xid[] xids = environment.recover(XAResource.TMSTARTRSCAN);
+
+ List<Xid> list = Arrays.asList(xids);
+
+ return list;
+ }
+
+ public void startWork(Xid xid) throws Exception
+ {
+ if (debug)
+ {
+ checkNoStatus();
+
+ threadTXStatuses.put(Thread.currentThread(), new ThreadTXStatus(xid));
+ }
+
+ environment.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ public void endWork(Xid xid, boolean failed) throws Exception
+ {
+ if (debug)
+ {
+ checkXAState(xid, XAState.IN_WORK);
+
+ setXAState(XAState.DONE_WORK);
+ }
+
+ environment.end(xid, failed ? XAResource.TMFAIL : XAResource.TMSUCCESS);
+ }
+
+ public void prepare(Xid xid) throws Exception
+ {
+ if (debug)
+ {
+ checkXAState(xid, XAState.DONE_WORK);
+
+ setXAState(XAState.PREPARE_CALLED);
+ }
+
+ environment.prepare(xid);
+ }
+
+ public void commit(Xid xid) throws Exception
+ {
+ if (debug)
+ {
+ checkXAState(xid, XAState.PREPARE_CALLED);
+
+ threadTXStatuses.remove(Thread.currentThread());
+ }
+
+ environment.commit(xid, false);
+ }
+
+ public void rollback(Xid xid) throws Exception
+ {
+ if (debug)
+ {
+ checkXAState(xid, XAState.PREPARE_CALLED);
+
+ threadTXStatuses.remove(Thread.currentThread());
+ }
+
+ environment.rollback(xid);
+ }
+
+ // Private -------------------------------------------------------------------------
+
+ /*
+ * Used for debug only
+ */
+ private ThreadTXStatus getTxStatus()
+ {
+ return threadTXStatuses.get(Thread.currentThread());
+ }
+
+ private void checkXAState(Xid xid, XAState state)
+ {
+ ThreadTXStatus status = getTxStatus();
+
+ if (status == null)
+ {
+ throw new IllegalStateException("Not started any xa work");
+ }
+
+ if (!state.equals(status.state))
+ {
+ throw new IllegalStateException("Invalid XAState expected " + state + " got " + status.state);
+ }
+
+ if (xid != status.implicitXid)
+ {
+ throw new IllegalStateException("Wrong xid");
+ }
+ }
+
+ private void checkNoStatus()
+ {
+ ThreadTXStatus status = getTxStatus();
+
+ if (status != null)
+ {
+ throw new IllegalStateException("XA status should not exist");
+ }
+ }
+
+ private void setXAState(XAState state)
+ {
+ threadTXStatuses.get(Thread.currentThread()).state = state;
+ }
+
+
+ // Inner classes --------------------------------------------------------------------
+
+ private enum XAState
+ {
+ NOT_STARTED, IN_WORK, DONE_WORK, PREPARE_CALLED
+ }
+
+ /*
+ * Used for debug only
+ */
+ private class ThreadTXStatus
+ {
+ ThreadTXStatus(Xid xid)
+ {
+ this.implicitXid = xid;
+ }
+
+ Xid implicitXid;
+
+ XAState state = XAState.IN_WORK;
+ }
+
+}
Added: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/integration/RealBDBJETransaction.java 2007-12-05 19:46:16 UTC (rev 3415)
@@ -0,0 +1,38 @@
+package org.jboss.messaging.newcore.impl.bdbje.integration;
+
+import org.jboss.messaging.newcore.impl.bdbje.BDBJETransaction;
+
+import com.sleepycat.je.Transaction;
+
+/**
+ *
+ * A RealBDBJETransaction
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealBDBJETransaction implements BDBJETransaction
+{
+ private Transaction transaction;
+
+ RealBDBJETransaction(Transaction transaction)
+ {
+ this.transaction = transaction;
+ }
+
+ public void commit() throws Exception
+ {
+ transaction.commit();
+ }
+
+ public void rollback() throws Exception
+ {
+ transaction.abort();
+ }
+
+ public Transaction getTransaction()
+ {
+ return transaction;
+ }
+
+}
More information about the jboss-cvs-commits
mailing list