[hornetq-commits] JBoss hornetq SVN: r8662 - in trunk/src/main/org/hornetq: core/client and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 10 06:44:53 EST 2009
Author: timfox
Date: 2009-12-10 06:44:51 -0500 (Thu, 10 Dec 2009)
New Revision: 8662
Added:
trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
Modified:
trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
trunk/src/main/org/hornetq/core/client/ClientMessage.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
Log:
refactored message interface to separate out internal methods to messgeinternal interface
Modified: trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -17,7 +17,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.utils.SimpleString;
/**
@@ -32,9 +32,9 @@
private final int limit;
- private final Message message;
+ private final MessageInternal message;
- public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer, final Message message)
+ public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer, final MessageInternal message)
{
super(buffer.channelBuffer());
Modified: trunk/src/main/org/hornetq/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/ClientMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -64,9 +64,8 @@
/**
* Sets the OutputStream that will receive the content of a message received in a non blocking way.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used when consuming large messages
*
- * @deprecated
* @throws HornetQException
*/
void setOutputStream(OutputStream out) throws HornetQException;
@@ -75,9 +74,8 @@
* Saves the content of the message to the OutputStream.
* It will block until the entire content is transfered to the OutputStream.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used for when consuming large messages
*
- * @deprecated
* @throws HornetQException
*/
void saveToOutputStream(OutputStream out) throws HornetQException;
@@ -85,22 +83,20 @@
/**
* Wait the outputStream completion of the message.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used when consuming large messages
*
* @param timeMilliseconds - 0 means wait forever
* @return true if it reached the end
* @throws HornetQException
-
- * @deprecated
+
*/
boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
/**
* Sets the body's IntputStream.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used when sending large messages
*
- * @deprecated
* @throws HornetQException
*/
void setBodyInputStream(InputStream bodyInputStream);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -15,6 +15,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.message.impl.MessageInternal;
/**
* A ClientMessageInternal
@@ -25,7 +26,7 @@
*
*
*/
-public interface ClientMessageInternal extends ClientMessage
+public interface ClientMessageInternal extends ClientMessage, MessageInternal
{
/** Size used for FlowControl */
int getFlowControlSize();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -23,6 +23,7 @@
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
@@ -201,18 +202,20 @@
private void doSend(final SimpleString address, final Message msg) throws HornetQException
{
+ MessageInternal msgI = (MessageInternal)msg;
+
ClientProducerCredits theCredits;
if (address != null)
{
- msg.setAddress(address);
+ msgI.setAddress(address);
// Anonymous
theCredits = session.getCredits(address);
}
else
{
- msg.setAddress(this.address);
+ msgI.setAddress(this.address);
theCredits = credits;
}
@@ -226,16 +229,16 @@
if (groupID != null)
{
- msg.putStringProperty(MessageImpl.HDR_GROUP_ID, groupID);
+ msgI.putStringProperty(MessageImpl.HDR_GROUP_ID, groupID);
}
- boolean sendBlocking = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
+ boolean sendBlocking = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
session.workDone();
boolean isLarge;
- if (msg.getBodyInputStream() != null || msg.isLargeMessage())
+ if (msgI.getBodyInputStream() != null || msgI.isLargeMessage())
{
isLarge = true;
}
@@ -246,19 +249,19 @@
if (isLarge)
{
- largeMessageSend(sendBlocking, msg, theCredits);
+ largeMessageSend(sendBlocking, msgI, theCredits);
}
else
{
- SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
+ SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking);
if (sendBlocking)
{
- channel.sendBlocking(message);
+ channel.sendBlocking(packet);
}
else
{
- channel.send(message);
+ channel.send(packet);
}
}
@@ -272,7 +275,7 @@
if (!isLarge)
{
- theCredits.acquireCredits(msg.getEncodeSize());
+ theCredits.acquireCredits(msgI.getEncodeSize());
}
}
catch (InterruptedException e)
@@ -291,12 +294,12 @@
// Methods to send Large Messages----------------------------------------------------------------
/**
- * @param msg
+ * @param msgI
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final Message msg, final ClientProducerCredits credits) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
{
- int headerSize = msg.getHeadersAndPropertiesEncodeSize();
+ int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
if (headerSize >= minLargeMessageSize)
{
@@ -305,14 +308,14 @@
}
// msg.getBody() could be Null on LargeServerMessage
- if (msg.getBodyInputStream() == null && msg.getWholeBuffer() != null)
+ if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null)
{
- msg.getWholeBuffer().readerIndex(0);
+ msgI.getWholeBuffer().readerIndex(0);
}
HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
- msg.encodeHeadersAndProperties(headerBuffer);
+ msgI.encodeHeadersAndProperties(headerBuffer);
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
@@ -320,13 +323,13 @@
try
{
- credits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+ credits.acquireCredits(msgI.getHeadersAndPropertiesEncodeSize());
}
catch (InterruptedException e)
{
}
- InputStream input = msg.getBodyInputStream();
+ InputStream input = msgI.getBodyInputStream();
if (input != null)
{
@@ -334,20 +337,20 @@
}
else
{
- largeMessageSendBuffered(sendBlocking, msg, credits);
+ largeMessageSendBuffered(sendBlocking, msgI, credits);
}
}
/**
* @param sendBlocking
- * @param msg
+ * @param msgI
* @throws HornetQException
*/
private void largeMessageSendBuffered(final boolean sendBlocking,
- final Message msg,
+ final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{
- BodyEncoder context = msg.getBodyEncoder();
+ BodyEncoder context = msgI.getBodyEncoder();
final long bodySize = context.getLargeBodySize();
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -18,7 +18,6 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -53,7 +52,6 @@
* If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a {@code boolean}),
* a PropertyConversionException will be thrown.
*
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">ClebertSuconic</a>
* @version <tt>$Revision: 3341 $</tt>
@@ -77,8 +75,6 @@
/**
* Sets the address to send this message to.
*
- * This method must not be called directly by HornetQ clients.
- *
* @param address address to send the message to
*/
void setAddress(SimpleString address);
@@ -169,43 +165,6 @@
*/
HornetQBuffer getBodyBuffer();
- // Should the following methods really be on the public API?
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void decodeFromBuffer(HornetQBuffer buffer);
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- int getEndOfMessagePosition();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- int getEndOfBodyPosition();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void checkCopy();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void bodyChanged();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void resetCopied();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- HornetQBuffer getEncodedBuffer();
-
// Properties
// ------------------------------------------------------------------
@@ -523,39 +482,8 @@
*/
Set<SimpleString> getPropertyNames();
- Map<String, Object> toMap();
-
- // FIXME - All this stuff is only necessary here for large messages - it should be refactored to be put in a better
- // place
-
/**
- * This method must not be called directly by HornetQ clients.
+ * @return Returns the message in Map form, useful when encoding to JSON
*/
- int getHeadersAndPropertiesEncodeSize();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- HornetQBuffer getWholeBuffer();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void encodeHeadersAndProperties(HornetQBuffer buffer);
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void decodeHeadersAndProperties(HornetQBuffer buffer);
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- BodyEncoder getBodyEncoder() throws HornetQException;
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- InputStream getBodyInputStream();
-
+ Map<String, Object> toMap();
}
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -25,7 +25,6 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
-import org.hornetq.core.message.Message;
import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.utils.DataConstants;
@@ -46,7 +45,7 @@
*
* $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
*/
-public abstract class MessageImpl implements Message
+public abstract class MessageImpl implements MessageInternal
{
// Constants -----------------------------------------------------
Added: trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java (rev 0)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.message.impl;
+
+import java.io.InputStream;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.BodyEncoder;
+import org.hornetq.core.message.Message;
+
+/**
+ * A MessageInternal
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * TODO - this can be refactored further to separate out large message specific stuff
+ *
+ *
+ */
+public interface MessageInternal extends Message
+{
+ void decodeFromBuffer(HornetQBuffer buffer);
+
+ int getEndOfMessagePosition();
+
+ int getEndOfBodyPosition();
+
+ void checkCopy();
+
+ void bodyChanged();
+
+ void resetCopied();
+
+ HornetQBuffer getEncodedBuffer();
+
+ int getHeadersAndPropertiesEncodeSize();
+
+ HornetQBuffer getWholeBuffer();
+
+ void encodeHeadersAndProperties(HornetQBuffer buffer);
+
+ void decodeHeadersAndProperties(HornetQBuffer buffer);
+
+ BodyEncoder getBodyEncoder() throws HornetQException;
+
+ InputStream getBodyInputStream();
+}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -15,6 +15,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
/**
* A MessagePacket
@@ -27,9 +28,9 @@
{
private static final Logger log = Logger.getLogger(MessagePacket.class);
- protected Message message;
+ protected MessageInternal message;
- public MessagePacket(final byte type, final Message message)
+ public MessagePacket(final byte type, final MessageInternal message)
{
super(type);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -16,7 +16,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.utils.DataConstants;
@@ -36,7 +36,7 @@
private int deliveryCount;
- public SessionReceiveMessage(final long consumerID, final Message message, final int deliveryCount)
+ public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount)
{
super(PacketImpl.SESS_RECEIVE_MSG, message);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -16,6 +16,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
@@ -39,7 +40,7 @@
// Constructors --------------------------------------------------
- public SessionSendMessage(final Message message, final boolean requiresResponse)
+ public SessionSendMessage(final MessageInternal message, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND, message);
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -14,7 +14,7 @@
package org.hornetq.core.server;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PagingStore;
/**
@@ -25,7 +25,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface ServerMessage extends Message, EncodingSupport
+public interface ServerMessage extends MessageInternal, EncodingSupport
{
void setMessageID(long id);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -700,12 +700,8 @@
public Object getObjectProperty(final String name) throws JMSException
{
- if (HornetQMessage.JMS_HORNETQ_INPUT_STREAM.equals(name))
+ if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
{
- return message.getBodyInputStream();
- }
- else if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
- {
return String.valueOf(message.getDeliveryCount());
}
More information about the hornetq-commits
mailing list