Author: timfox
Date: 2010-03-31 13:57:19 -0400 (Wed, 31 Mar 2010)
New Revision: 9037
Modified:
trunk/src/main/org/hornetq/api/core/FilterConstants.java
trunk/src/main/org/hornetq/api/core/Message.java
trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java
trunk/src/main/org/hornetq/core/filter/impl/FilterParser.jj
trunk/src/main/org/hornetq/core/filter/impl/SimpleStringReader.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
JMS message id optimisation
Modified: trunk/src/main/org/hornetq/api/core/FilterConstants.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/FilterConstants.java 2010-03-31 17:35:13 UTC (rev
9036)
+++ trunk/src/main/org/hornetq/api/core/FilterConstants.java 2010-03-31 17:57:19 UTC (rev
9037)
@@ -24,6 +24,11 @@
public class FilterConstants
{
/**
+ * Name of the HornetQ UserID header.
+ */
+ public static final SimpleString HORNETQ_USERID = new
SimpleString("HQUserID");
+
+ /**
* Name of the HornetQ Message expiration header.
*/
public static final SimpleString HORNETQ_EXPIRATION = new
SimpleString("HQExpiration");
Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java 2010-03-31 17:35:13 UTC (rev 9036)
+++ trunk/src/main/org/hornetq/api/core/Message.java 2010-03-31 17:57:19 UTC (rev 9037)
@@ -88,6 +88,19 @@
* The messageID is set when the message is handled by the server.
*/
long getMessageID();
+
+ /**
+ * Returns the userID - this is an optional user specified String that can be set to
identify the message
+ * and will be passed around with the message
+ * @return
+ */
+ SimpleString getUserID();
+
+ /**
+ * Sets the user ID
+ * @param userID
+ */
+ void setUserID(SimpleString userID);
/**
* Returns the address this message is sent to.
@@ -303,7 +316,7 @@
* @param value property value
*/
void putStringProperty(String key, String value);
-
+
/**
* Puts an Object property in this message.
* <br>
Modified: trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2010-03-31 17:35:13 UTC
(rev 9036)
+++ trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2010-03-31 17:57:19 UTC
(rev 9037)
@@ -38,6 +38,7 @@
* HQDurable - "DURABLE" or "NON_DURABLE"
* HQExpiration - the expiration of the message
* HQSize - the encoded size of the full message in bytes
+* HQUserID - the user specified ID string (if any)
* Any other identifers that appear in a filter expression represent header values for the
message
*
* String values must be set as <code>SimpleString</code>, not
<code>java.lang.String</code> (see JBMESSAGING-1307).
@@ -105,16 +106,15 @@
try
{
+ result = parser.parse(sfilterString, identifiers);
- result = parser.parse(sfilterString, identifiers);
resultType = result.getClass();
-
}
catch (Throwable e)
{
FilterImpl.log.error("Invalid filter: " + str, e);
- throw new HornetQException(HornetQException.INVALID_FILTER_EXPRESSION,
"Invalid filter: " + sfilterString);
+ throw new HornetQException(HornetQException.INVALID_FILTER_EXPRESSION,
"Invalid filter: " + sfilterString + " " + e.getMessage());
}
}
@@ -151,12 +151,15 @@
}
if (resultType.equals(Identifier.class))
+ {
return (Boolean)((Identifier)result).getValue();
+ }
else if (resultType.equals(Operator.class))
{
- Operator op = (Operator) result;
+ Operator op = (Operator)result;
return (Boolean)op.apply();
- } else
+ }
+ else
{
throw new Exception("Bad object type: " + result);
}
@@ -182,8 +185,12 @@
private Object getHeaderFieldValue(final ServerMessage msg, final SimpleString
fieldName)
{
- if (FilterConstants.HORNETQ_PRIORITY.equals(fieldName))
+ if (FilterConstants.HORNETQ_USERID.equals(fieldName))
{
+ return msg.getUserID();
+ }
+ else if (FilterConstants.HORNETQ_PRIORITY.equals(fieldName))
+ {
return new Integer(msg.getPriority());
}
else if (FilterConstants.HORNETQ_TIMESTAMP.equals(fieldName))
Modified: trunk/src/main/org/hornetq/core/filter/impl/FilterParser.jj
===================================================================
--- trunk/src/main/org/hornetq/core/filter/impl/FilterParser.jj 2010-03-31 17:35:13 UTC
(rev 9036)
+++ trunk/src/main/org/hornetq/core/filter/impl/FilterParser.jj 2010-03-31 17:57:19 UTC
(rev 9037)
@@ -22,6 +22,7 @@
options {
LOOKAHEAD=1;
+ UNICODE_INPUT=true;
/*
DEBUG_PARSER=true;
DEBUG_LOOKAHEAD=true;
Modified: trunk/src/main/org/hornetq/core/filter/impl/SimpleStringReader.java
===================================================================
--- trunk/src/main/org/hornetq/core/filter/impl/SimpleStringReader.java 2010-03-31
17:35:13 UTC (rev 9036)
+++ trunk/src/main/org/hornetq/core/filter/impl/SimpleStringReader.java 2010-03-31
17:57:19 UTC (rev 9037)
@@ -17,6 +17,7 @@
import java.io.Reader;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
/**
@@ -32,6 +33,8 @@
{
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(SimpleStringReader.class);
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-31 17:35:13 UTC
(rev 9036)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-31 17:57:19 UTC
(rev 9037)
@@ -85,6 +85,8 @@
private boolean copied = true;
private boolean bufferUsed;
+
+ private SimpleString userID;
// Constructors --------------------------------------------------
@@ -136,6 +138,7 @@
protected MessageImpl(final MessageImpl other)
{
messageID = other.getMessageID();
+ userID = other.getUserID();
address = other.getAddress();
type = other.getType();
durable = other.isDurable();
@@ -183,6 +186,7 @@
public int getHeadersAndPropertiesEncodeSize()
{
return DataConstants.SIZE_LONG + // Message ID
+ SimpleString.sizeofNullableString(userID) +
/* address */SimpleString.sizeofNullableString(address) +
DataConstants./* Type */SIZE_BYTE +
DataConstants./* Durable */SIZE_BOOLEAN +
@@ -197,6 +201,7 @@
{
buffer.writeLong(messageID);
buffer.writeNullableSimpleString(address);
+ buffer.writeNullableSimpleString(userID);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
@@ -209,6 +214,7 @@
{
messageID = buffer.readLong();
address = buffer.readNullableSimpleString();
+ userID = buffer.readNullableSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
@@ -238,6 +244,16 @@
{
return messageID;
}
+
+ public SimpleString getUserID()
+ {
+ return userID;
+ }
+
+ public void setUserID(final SimpleString userID)
+ {
+ this.userID = userID;
+ }
public SimpleString getAddress()
{
@@ -339,6 +355,7 @@
Map<String, Object> map = new HashMap<String, Object>();
map.put("messageID", messageID);
+ map.put("userID", userID);
map.put("address", address.toString());
map.put("type", type);
map.put("durable", durable);
@@ -516,7 +533,7 @@
bufferValid = false;
}
-
+
public void putObjectProperty(final SimpleString key, final Object value) throws
PropertyConversionException
{
if (value == null)
@@ -759,7 +776,7 @@
{
return properties.getSimpleStringProperty(new SimpleString(key));
}
-
+
public Object getObjectProperty(final String key)
{
return properties.getProperty(new SimpleString(key));
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-03-31 17:35:13 UTC
(rev 9036)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-03-31 17:57:19 UTC
(rev 9037)
@@ -1119,7 +1119,7 @@
{
ref = holder.iter.next();
}
-
+
if (ref == null)
{
nullRefCount++;
@@ -1168,6 +1168,7 @@
}
else if (status == HandleStatus.BUSY)
{
+ log.info("busy");
if (holder.iter == null)
{
// Put the ref back
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-03-31 17:35:13 UTC (rev
9036)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-03-31 17:57:19 UTC (rev
9037)
@@ -42,7 +42,6 @@
import org.hornetq.api.jms.HornetQJMSConstants;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.UUID;
/**
* HornetQ implementation of a JMS Message.
@@ -70,10 +69,6 @@
private static final SimpleString CORRELATIONID_HEADER_NAME = new
SimpleString("JMSCorrelationID");
- //public static final SimpleString HORNETQ_MESSAGE_ID = new
SimpleString("_HQI");
-
- public static final SimpleString JMSMESSAGEID_HEADER_NAME = new
SimpleString("JMSMessageID");
-
private static final SimpleString TYPE_HEADER_NAME = new
SimpleString("JMSType");
private static final SimpleString JMS = new SimpleString("JMS");
@@ -112,6 +107,10 @@
{
// Ignore
}
+ else if (entry.getKey().equals("userID"))
+ {
+ jmsMessage.put("JMSMessageID", entry.getValue().toString());
+ }
else
{
Object value = entry.getValue();
@@ -307,45 +306,17 @@
// javax.jmx.Message implementation ------------------------------
-// public String getJMSMessageID()
-// {
-// if (msgID == null)
-// {
-// byte[] bytes = message.getBytesProperty(HornetQMessage.HORNETQ_MESSAGE_ID);
-//
-// msgID = bytes == null ? null : "ID:" + new
UUID(UUID.TYPE_TIME_BASED, bytes).toString();
-// }
-// return msgID;
-// }
-
public String getJMSMessageID()
{
if (msgID == null)
{
- msgID = message.getStringProperty(HornetQMessage.JMSMESSAGEID_HEADER_NAME);
+ SimpleString uid = message.getUserID();
+
+ msgID = uid == null ? null : uid.toString();
}
return msgID;
}
-// public void setJMSMessageID(final String jmsMessageID) throws JMSException
-// {
-// if (jmsMessageID != null && !jmsMessageID.startsWith("ID:"))
-// {
-// throw new JMSException("JMSMessageID must start with ID:");
-// }
-//
-// if (jmsMessageID == null)
-// {
-// message.removeProperty(HornetQMessage.HORNETQ_MESSAGE_ID);
-// }
-// else
-// {
-// message.putStringProperty(HornetQMessage.HORNETQ_MESSAGE_ID, new
SimpleString(jmsMessageID));
-// }
-//
-// msgID = jmsMessageID;
-// }
-
public void setJMSMessageID(final String jmsMessageID) throws JMSException
{
if (jmsMessageID != null && !jmsMessageID.startsWith("ID:"))
@@ -353,18 +324,11 @@
throw new JMSException("JMSMessageID must start with ID:");
}
- if (jmsMessageID == null)
- {
- message.removeProperty(HornetQMessage.JMSMESSAGEID_HEADER_NAME);
- }
- else
- {
- message.putStringProperty(HornetQMessage.JMSMESSAGEID_HEADER_NAME, new
SimpleString(jmsMessageID));
- }
+ message.setUserID(null);
msgID = jmsMessageID;
}
-
+
public long getJMSTimestamp() throws JMSException
{
return message.getTimestamp();
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2010-03-31 17:35:13
UTC (rev 9036)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2010-03-31 17:57:19
UTC (rev 9037)
@@ -82,9 +82,9 @@
// Constructors --------------------------------------------------
protected HornetQMessageProducer(final HornetQConnection jbossConn,
- final ClientProducer producer,
- final HornetQDestination defaultDestination,
- final ClientSession clientSession) throws JMSException
+ final ClientProducer producer,
+ final HornetQDestination defaultDestination,
+ final ClientSession clientSession) throws
JMSException
{
this.jbossConn = jbossConn;
@@ -350,8 +350,8 @@
if (!destination.equals(defaultDestination))
{
throw new UnsupportedOperationException("Where a default destination
is specified " + "for the sender and a destination is "
- + "specified in the arguments to the send,
"
- + "these destinations must be equal");
+ + "specified in the arguments
to the send, "
+ + "these destinations must be
equal");
}
}
@@ -409,40 +409,22 @@
UUID uid = UUIDGenerator.getInstance().generateUUID();
- msg.getCoreMessage().putStringProperty(HornetQMessage.JMSMESSAGEID_HEADER_NAME,
new SimpleString("ID:" + uid.toString()));
+ byte[] bytes = uid.asBytes();
- msg.resetMessageID(null);
+ byte[] id = new byte[6 + 16];
+
+ id[0] = (byte)'I';
+ id[2] = (byte)'D';
+ id[4] = (byte)':';
+
+ System.arraycopy(bytes, 0, id, 6, 16);
+
+ SimpleString ssid = new SimpleString(id);
+
+ msg.getCoreMessage().setUserID(ssid);
+
+ msg.resetMessageID(null);
}
-
-// if (!disableMessageID)
-// {
-// // Generate an id
-//
-// UUID uid = UUIDGenerator.getInstance().generateUUID();
-//
-// byte[] bytes = uid.asBytes();
-//
-// long id1 = bytes[0] << 56 | bytes[1] << 48 |
-// bytes[2] << 40 |
-// bytes[3] << 32 |
-// bytes[4] << 24 |
-// bytes[5] << 16 |
-// bytes[6] << 8 |
-// bytes[7];
-//
-// long id2 = bytes[8] << 56 | bytes[9] << 48 |
-// bytes[10] << 40 |
-// bytes[11] << 32 |
-// bytes[12] << 24 |
-// bytes[13] << 16 |
-// bytes[14] << 8 |
-// bytes[15];
-//
-// //We store it as two longs in the message, as it's a more compact format
-// msg.getCoreMessage().setClientMessageID(id1, id2);
-//
-// msg.resetMessageID(null);
-// }
if (foreign)
{
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-03-31
17:35:13 UTC (rev 9036)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-03-31
17:57:19 UTC (rev 9037)
@@ -13,16 +13,15 @@
package org.hornetq.jms.management.impl;
-import java.util.List;
import java.util.Map;
import javax.management.MBeanInfo;
import javax.management.StandardMBean;
+import org.hornetq.api.core.FilterConstants;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.MessageCounterInfo;
import org.hornetq.api.core.management.QueueControl;
-import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.MBeanInfoHelper;
@@ -70,7 +69,7 @@
private static String createFilterForJMSMessageID(final String jmsMessageID) throws
Exception
{
- return HornetQMessage.JMSMESSAGEID_HEADER_NAME + " = '" +
jmsMessageID + "'";
+ return FilterConstants.HORNETQ_USERID + " = '" + jmsMessageID +
"'";
}
static String toJSON(final Map<String, Object>[] messages)
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-03-31
17:35:13 UTC (rev 9036)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-03-31
17:57:19 UTC (rev 9037)
@@ -886,6 +886,18 @@
}
+ public SimpleString getUserID()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setUserID(SimpleString userID)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeFilter implements Filter