[jboss-cvs] JBoss Messaging SVN: r2798 - in trunk/src/main/org/jboss/messaging/core/impl: message and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 25 18:28:37 EDT 2007
Author: timfox
Date: 2007-06-25 18:28:37 -0400 (Mon, 25 Jun 2007)
New Revision: 2798
Added:
trunk/src/main/org/jboss/messaging/core/impl/message/
trunk/src/main/org/jboss/messaging/core/impl/message/CoreMessage.java
trunk/src/main/org/jboss/messaging/core/impl/message/MessageFactory.java
trunk/src/main/org/jboss/messaging/core/impl/message/MessageHolder.java
trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java
trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageReference.java
trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageStore.java
Log:
added missing dir
Added: trunk/src/main/org/jboss/messaging/core/impl/message/CoreMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/CoreMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/CoreMessage.java 2007-06-25 22:28:37 UTC (rev 2798)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.message;
+
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2202 $</tt>
+ *
+ * $Id: CoreMessage.java 2202 2007-02-08 10:50:26Z timfox $
+ */
+public class CoreMessage extends MessageSupport
+{
+ // Constants -----------------------------------------------------
+
+ private static final long serialVersionUID = -4740357138097778538L;
+
+ public static final byte TYPE = 127;
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * Required by externalization.
+ */
+ public CoreMessage()
+ {
+ }
+
+ public CoreMessage(long messageID,
+ boolean reliable,
+ long expiration,
+ long timestamp,
+ byte priority,
+ Map headers,
+ byte[] payload)
+ {
+ super(messageID, reliable, expiration, timestamp, priority, headers, payload);
+ }
+
+ // Public --------------------------------------------------------
+
+ public String toString()
+ {
+ return "CoreMessage["+messageID+"]";
+ }
+
+ public byte getType()
+ {
+ return TYPE;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/message/MessageFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/MessageFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/MessageFactory.java 2007-06-25 22:28:37 UTC (rev 2798)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.message;
+
+import java.util.Map;
+
+import org.jboss.jms.message.JBossBytesMessage;
+import org.jboss.jms.message.JBossMapMessage;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.message.JBossObjectMessage;
+import org.jboss.jms.message.JBossStreamMessage;
+import org.jboss.jms.message.JBossTextMessage;
+import org.jboss.messaging.core.contract.Message;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2284 $</tt>
+ *
+ * $Id: MessageFactory.java 2284 2007-02-13 06:47:23Z ovidiu.feodorov at jboss.com $
+ */
+public class MessageFactory
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ public static Message createMessage(byte type)
+ {
+ Message m = null;
+
+ if (type == JBossMessage.TYPE) //1
+ {
+ m = new JBossMessage();
+ }
+ else if (type == JBossObjectMessage.TYPE) //2
+ {
+ m = new JBossObjectMessage();
+ }
+ else if (type == JBossTextMessage.TYPE) //3
+ {
+ m = new JBossTextMessage();
+ }
+ else if (type == JBossBytesMessage.TYPE) //4
+ {
+ m = new JBossBytesMessage();
+ }
+ else if (type == JBossMapMessage.TYPE) //5
+ {
+ m = new JBossMapMessage();
+ }
+ else if (type == JBossStreamMessage.TYPE) //6
+ {
+ m = new JBossStreamMessage();
+ }
+ else if (type == CoreMessage.TYPE) //127
+ {
+ m = new CoreMessage();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid type " + type);
+ }
+
+ return m;
+ }
+
+ /*
+ * Create a message from persistent storage
+ */
+ public static Message createMessage(long messageID,
+ boolean reliable,
+ long expiration,
+ long timestamp,
+ byte priority,
+ Map headers,
+ byte[] payload,
+ byte type)
+
+ {
+ Message m = null;
+
+ switch (type)
+ {
+ case JBossMessage.TYPE:
+ {
+ m = new JBossMessage(messageID, reliable, expiration,
+ timestamp, priority, headers, payload);
+ break;
+ }
+ case JBossObjectMessage.TYPE:
+ {
+ m = new JBossObjectMessage(messageID, reliable, expiration,
+ timestamp, priority, headers, payload);
+ break;
+ }
+ case JBossTextMessage.TYPE:
+ {
+ m = new JBossTextMessage(messageID, reliable, expiration,
+ timestamp, priority, headers, payload);
+ break;
+ }
+ case JBossBytesMessage.TYPE:
+ {
+ m = new JBossBytesMessage(messageID, reliable, expiration,
+ timestamp, priority, headers, payload);
+ break;
+ }
+ case JBossMapMessage.TYPE:
+ {
+ m = new JBossMapMessage(messageID, reliable, expiration,
+ timestamp, priority, headers, payload);
+ break;
+ }
+ case JBossStreamMessage.TYPE:
+ {
+ m = new JBossStreamMessage(messageID, reliable, expiration,
+ timestamp, priority, headers, payload);
+ break;
+ }
+ case CoreMessage.TYPE:
+ {
+ m = new CoreMessage(messageID, reliable, expiration,
+ timestamp, priority, headers, payload);
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Unknown type " + type);
+ }
+ }
+
+ m.setPersisted(true);
+
+ return m;
+ }
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ // Public ---------------------------------------------------------------------------------------
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+}
+
Added: trunk/src/main/org/jboss/messaging/core/impl/message/MessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/MessageHolder.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/MessageHolder.java 2007-06-25 22:28:37 UTC (rev 2798)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.impl.message;
+
+import org.jboss.messaging.core.contract.Message;
+
+
+/**
+ *
+ * A MessageHolder.
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2202 $</tt>
+ *
+ * $Id: MessageHolder.java 2202 2007-02-08 10:50:26Z timfox $
+ */
+class MessageHolder
+{
+ /*
+ * The number of channels *currently in memory* that hold a reference to the message
+ * We need this so we know when to evict the message from the store (when it reaches zero)
+ * Note that we also maintain a persistent channel count on the message itself.
+ * This is the total number of channels whether loaded in memory or not that hold a reference to the
+ * message and is needed to know when it is safe to remove the message from the db
+ */
+ private int inMemoryChannelCount;
+
+ private Message msg;
+
+ private SimpleMessageStore ms;
+
+ public MessageHolder(Message msg, SimpleMessageStore ms)
+ {
+ this.msg = msg;
+ this.ms = ms;
+ }
+
+ public synchronized void incrementInMemoryChannelCount()
+ {
+ inMemoryChannelCount++;
+ }
+
+ public synchronized void decrementInMemoryChannelCount()
+ {
+ inMemoryChannelCount--;
+
+ if (inMemoryChannelCount == 0)
+ {
+ // can remove the message from the message store
+ ms.forgetMessage(msg.getMessageID());
+ }
+ }
+
+ public synchronized int getInMemoryChannelCount()
+ {
+ return inMemoryChannelCount;
+ }
+
+ public Message getMessage()
+ {
+ return msg;
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/MessageSupport.java 2007-06-25 22:28:37 UTC (rev 2798)
@@ -0,0 +1,444 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Message;
+import org.jboss.messaging.util.StreamUtils;
+
+/**
+ * A message base.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2740 $</tt>
+ *
+ * Note this class is only serializable so messages can't be returned from JMX operations
+ * e.g. listAllMessages.
+ * For normal message transportation serialization is not used
+ *
+ * $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
+ */
+public abstract class MessageSupport implements Message, Serializable
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(MessageSupport.class);
+
+ // Attributes ----------------------------------------------------
+
+ private boolean trace = log.isTraceEnabled();
+
+ protected long messageID;
+
+ protected boolean reliable;
+
+ /** GMT milliseconds at which this message expires. 0 means never expires * */
+ protected long expiration;
+
+ protected long timestamp;
+
+ protected Map headers;
+
+ protected byte priority;
+
+ // Must be hidden from subclasses
+ private transient Object payload;
+
+ // Must be hidden from subclasses
+ private byte[] payloadAsByteArray;
+
+ private transient boolean persisted;
+
+ // Constructors --------------------------------------------------
+
+ /*
+ * Construct a message for deserialization or streaming
+ */
+ public MessageSupport()
+ {
+ }
+
+ /*
+ * Construct a message using default values
+ */
+ public MessageSupport(long messageID)
+ {
+ this(messageID, false, 0, System.currentTimeMillis(), (byte) 4, null,
+ null);
+ }
+
+ /*
+ * Construct a message using specified values
+ */
+ public MessageSupport(long messageID, boolean reliable, long expiration,
+ long timestamp, byte priority, Map headers, byte[] payloadAsByteArray)
+ {
+ this.messageID = messageID;
+ this.reliable = reliable;
+ this.expiration = expiration;
+ this.timestamp = timestamp;
+ this.priority = priority;
+ if (headers == null)
+ {
+ this.headers = new HashMap();
+ } else
+ {
+ this.headers = new HashMap(headers);
+ }
+
+ this.payloadAsByteArray = payloadAsByteArray;
+ }
+
+ /*
+ * Copy constructor
+ *
+ * Does a shallow copy of the payload
+ */
+ protected MessageSupport(MessageSupport that)
+ {
+ this.messageID = that.messageID;
+ this.reliable = that.reliable;
+ this.expiration = that.expiration;
+ this.timestamp = that.timestamp;
+ this.headers = new HashMap(that.headers);
+ this.priority = that.priority;
+ this.payload = that.payload;
+ this.payloadAsByteArray = that.payloadAsByteArray;
+ }
+
+ // Message implementation ----------------------------------------
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public boolean isReliable()
+ {
+ return reliable;
+ }
+
+ public long getExpiration()
+ {
+ return expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ this.expiration = expiration;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public Object putHeader(String name, Object value)
+ {
+ return headers.put(name, value);
+ }
+
+ public Object getHeader(String name)
+ {
+ return headers.get(name);
+ }
+
+ public Object removeHeader(String name)
+ {
+ return headers.remove(name);
+ }
+
+ public boolean containsHeader(String name)
+ {
+ return headers.containsKey(name);
+ }
+
+ public Map getHeaders()
+ {
+ return headers;
+ }
+
+ public byte getPriority()
+ {
+ return priority;
+ }
+
+ public void setPriority(byte priority)
+ {
+ this.priority = priority;
+ }
+
+ public boolean isReference()
+ {
+ return false;
+ }
+
+ public synchronized byte[] getPayloadAsByteArray()
+ {
+ if (payloadAsByteArray == null && payload != null)
+ {
+ // convert the payload into a byte array and store internally
+
+ // TODO - investigate how changing the buffer size effects
+ // performance
+
+ // Ideally I would like to use the pre-existing DataOutputStream and
+ // not create another one - but would then have to add markers on the
+ // stream
+ // to signify the end of the payload
+ // This would have the advantage of us not having to allocate buffers
+ // here
+ // We could do this by creating our own FilterOutputStream that makes
+ // sure
+ // the end of marker sequence doesn't occur in the payload
+
+ final int BUFFER_SIZE = 2048;
+
+ try
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(BUFFER_SIZE);
+ DataOutputStream daos = new DataOutputStream(bos);
+ writePayload(daos, payload);
+ daos.close();
+ payloadAsByteArray = bos.toByteArray();
+ payload = null;
+ } catch (Exception e)
+ {
+ RuntimeException e2 = new RuntimeException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ }
+ return payloadAsByteArray;
+ }
+
+ /**
+ * Warning! Calling getPayload will cause the payload to be deserialized so
+ * should not be called on the server.
+ */
+ public synchronized Object getPayload()
+ {
+ if (payload != null)
+ {
+ return payload;
+ }
+ else if (payloadAsByteArray != null)
+ {
+ // deserialize the payload from byte[]
+
+ // TODO use the same DataInputStream as in the read() method and
+ // add markers on the stream to represent end of payload
+ ByteArrayInputStream bis = new ByteArrayInputStream(payloadAsByteArray);
+ DataInputStream dis = new DataInputStream(bis);
+ try
+ {
+ payload = readPayload(dis, payloadAsByteArray.length);
+ }
+ catch (Exception e)
+ {
+ RuntimeException e2 = new RuntimeException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+
+ payloadAsByteArray = null;
+ return payload;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void setPayload(Serializable payload)
+ {
+ this.payload = payload;
+ }
+
+ protected void clearPayloadAsByteArray()
+ {
+ this.payloadAsByteArray = null;
+ }
+
+ public synchronized boolean isPersisted()
+ {
+ return persisted;
+ }
+
+ public synchronized void setPersisted(boolean persisted)
+ {
+ this.persisted = persisted;
+ }
+
+ public boolean isExpired()
+ {
+ if (expiration == 0)
+ {
+ return false;
+ }
+ long overtime = System.currentTimeMillis() - expiration;
+ if (overtime >= 0)
+ {
+ // discard it
+ if (trace)
+ {
+ log.trace(this + " expired by " + overtime + " ms");
+ }
+
+ return true;
+ }
+ return false;
+ }
+
+ // Public --------------------------------------------------------
+
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (!(o instanceof MessageSupport))
+ {
+ return false;
+ }
+ MessageSupport that = (MessageSupport) o;
+ return that.messageID == this.messageID;
+ }
+
+ public int hashCode()
+ {
+ return (int) ((this.messageID >>> 32) ^ this.messageID);
+ }
+
+ public String toString()
+ {
+ return "M[" + messageID + "]";
+ }
+
+ // Streamable implementation ---------------------------------
+
+ public void write(DataOutputStream out) throws Exception
+ {
+ out.writeLong(messageID);
+
+ out.writeBoolean(reliable);
+
+ out.writeLong(expiration);
+
+ out.writeLong(timestamp);
+
+ StreamUtils.writeMap(out, headers, true);
+
+ out.writeByte(priority);
+
+ byte[] bytes = getPayloadAsByteArray();
+
+ if (bytes != null)
+ {
+ out.writeInt(bytes.length);
+
+ out.write(bytes);
+ } else
+ {
+ out.writeInt(0);
+ }
+ }
+
+ public void read(DataInputStream in) throws Exception
+ {
+ messageID = in.readLong();
+
+ reliable = in.readBoolean();
+
+ expiration = in.readLong();
+
+ timestamp = in.readLong();
+
+ headers = StreamUtils.readMap(in, true);
+
+ priority = in.readByte();
+
+ int length = in.readInt();
+
+ if (length == 0)
+ {
+ // no payload
+ payloadAsByteArray = null;
+ } else
+ {
+ payloadAsByteArray = new byte[length];
+
+ in.readFully(payloadAsByteArray);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ /**
+ * Override this if you want more sophisticated payload externalization.
+ *
+ * @throws Exception
+ * TODO
+ */
+ protected void writePayload(DataOutputStream out, Object thePayload)
+ throws Exception
+ {
+ StreamUtils.writeObject(out, thePayload, true, true);
+ }
+
+ /**
+ * Override this if you want more sophisticated payload externalization.
+ *
+ * @throws Exception
+ * TODO
+ */
+ protected Object readPayload(DataInputStream in, int length)
+ throws Exception
+ {
+ return StreamUtils.readObject(in, true);
+ }
+
+ /**
+ * It makes sense to use this method only from within JBossBytesMessage
+ * (optimization). Using it from anywhere else will lead to corrupted data.
+ */
+ protected final void copyPayloadAsByteArrayToPayload()
+ {
+ payload = payloadAsByteArray;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageReference.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageReference.java 2007-06-25 22:28:37 UTC (rev 2798)
@@ -0,0 +1,181 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.message;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Message;
+import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.contract.MessageStore;
+
+/**
+ * A Simple MessageReference implementation.
+ *
+ * Note that we do not need WeakReferences to message/holder objects since with the new
+ * lazy loading schema we guarantee that if a message ref is in memory - it's corresponding message is
+ * in memory too
+ *
+ * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @version <tt>1.3</tt>
+ *
+ * SimpleMessageReference.java,v 1.3 2006/02/23 17:45:57 timfox Exp
+ */
+public class SimpleMessageReference implements MessageReference
+{
+ private static final Logger log = Logger.getLogger(SimpleMessageReference.class);
+
+ // Attributes ----------------------------------------------------
+
+ private boolean trace = log.isTraceEnabled();
+
+ protected transient MessageStore ms;
+
+ private MessageHolder holder;
+
+ private long pagingOrder = -1;
+
+ private boolean released;
+
+ private int deliveryCount;
+
+ private long scheduledDeliveryTime;
+
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * Required by externalization.
+ */
+ public SimpleMessageReference()
+ {
+ if (trace) { log.trace("Creating using default constructor"); }
+ }
+
+ public SimpleMessageReference(SimpleMessageReference other)
+ {
+ this.ms = other.ms;
+
+ this.holder = other.holder;
+
+ this.pagingOrder = other.pagingOrder;
+
+ this.released = other.released;
+
+ this.deliveryCount = other.deliveryCount;
+
+ this.scheduledDeliveryTime = other.scheduledDeliveryTime;
+ }
+
+ protected SimpleMessageReference(MessageHolder holder, MessageStore ms)
+ {
+ this.holder = holder;
+
+ this.ms = ms;
+ }
+
+ // Message implementation ----------------------------------------
+
+ public boolean isReference()
+ {
+ return true;
+ }
+
+ // MessageReference implementation -------------------------------
+
+ public int getDeliveryCount()
+ {
+ return deliveryCount;
+ }
+
+ public void setDeliveryCount(int deliveryCount)
+ {
+ this.deliveryCount = deliveryCount;
+ }
+
+ public long getScheduledDeliveryTime()
+ {
+ return scheduledDeliveryTime;
+ }
+
+ public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ {
+ this.scheduledDeliveryTime = scheduledDeliveryTime;
+ }
+
+ public Message getMessage()
+ {
+ return holder.getMessage();
+ }
+
+ public void releaseMemoryReference()
+ {
+ if (released)
+ {
+ //Do nothing -
+ //It's possible releaseMemoryReference can be called more than once on a reference since it's
+ //allowable that acknowledge is called more than once for a delivery and each call will call this
+ //method - so we don't want to throw an exception
+ return;
+ }
+ holder.decrementInMemoryChannelCount();
+
+ released = true;
+ }
+
+ public int getInMemoryChannelCount()
+ {
+ return holder.getInMemoryChannelCount();
+ }
+
+ public long getPagingOrder()
+ {
+ return pagingOrder;
+ }
+
+ public void setPagingOrder(long order)
+ {
+ this.pagingOrder = order;
+ }
+
+ public MessageReference copy()
+ {
+ SimpleMessageReference ref = new SimpleMessageReference(this);
+
+ ref.holder.incrementInMemoryChannelCount();
+
+ return ref;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String toString()
+ {
+ return "Reference[" + getMessage().getMessageID() + "]:" + (getMessage().isReliable() ? "RELIABLE" : "NON-RELIABLE");
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
\ No newline at end of file
Added: trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageStore.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/impl/message/SimpleMessageStore.java 2007-06-25 22:28:37 UTC (rev 2798)
@@ -0,0 +1,175 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.impl.message;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Message;
+import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.contract.MessageStore;
+
+/**
+ * A MessageStore implementation.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2202 $</tt>
+ *
+ * $Id: SimpleMessageStore.java 2202 2007-02-08 10:50:26Z timfox $
+ */
+public class SimpleMessageStore implements MessageStore
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(SimpleMessageStore.class);
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private boolean trace = log.isTraceEnabled();
+
+ // <messageID - MessageHolder>
+ private Map messages;
+
+ // Constructors --------------------------------------------------
+
+ public SimpleMessageStore()
+ {
+ messages = new HashMap();
+
+ log.debug(this + " initialized");
+ }
+
+ // MessageStore implementation ---------------------------
+
+ public Object getInstance()
+ {
+ return this;
+ }
+
+ // TODO If we can assume that the message is not known to the store before
+ // (true when sending messages)
+ // Then we can avoid synchronizing on this and use a ConcurrentHashmap
+ // Which will give us much better concurrency for many threads
+ public MessageReference reference(Message m)
+ {
+ MessageHolder holder;
+
+ synchronized (this)
+ {
+ holder = (MessageHolder)messages.get(new Long(m.getMessageID()));
+
+ if (holder == null)
+ {
+ holder = addMessage(m);
+ }
+ }
+ holder.incrementInMemoryChannelCount();
+
+ MessageReference ref = new SimpleMessageReference(holder, this);
+
+ if (trace) { log.trace(this + " generated " + ref + " for " + m); }
+
+ return ref;
+ }
+
+ public MessageReference reference(long messageID)
+ {
+ MessageHolder holder;
+
+ synchronized (this)
+ {
+ holder = (MessageHolder)messages.get(new Long(messageID));
+ }
+
+ if (holder == null)
+ {
+ return null;
+ }
+
+ MessageReference ref = new SimpleMessageReference(holder, this);
+
+ if (trace) { log.trace(this + " generates " + ref + " for " + messageID); }
+
+ holder.incrementInMemoryChannelCount();
+
+ return ref;
+ }
+
+
+ public boolean forgetMessage(long messageID)
+ {
+ return messages.remove(new Long(messageID)) != null;
+ }
+
+ public int size()
+ {
+ return messages.size();
+ }
+
+ public List messageIds()
+ {
+ return new ArrayList(messages.keySet());
+ }
+
+ // MessagingComponent implementation --------------------------------
+
+ public void start() throws Exception
+ {
+ //NOOP
+ }
+
+ public void stop() throws Exception
+ {
+ //NOOP
+ }
+
+ // Public --------------------------------------------------------
+
+ public String toString()
+ {
+ return "MemoryStore[" + System.identityHashCode(this) + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected MessageHolder addMessage(Message m)
+ {
+ MessageHolder holder = new MessageHolder(m, this);
+
+ messages.put(new Long(m.getMessageID()), holder);
+
+ return holder;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list