[hornetq-commits] JBoss hornetq SVN: r8662 - in trunk/src/main/org/hornetq: core/client and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 10 06:44:53 EST 2009


Author: timfox
Date: 2009-12-10 06:44:51 -0500 (Thu, 10 Dec 2009)
New Revision: 8662

Added:
   trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
Modified:
   trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
   trunk/src/main/org/hornetq/core/client/ClientMessage.java
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/message/Message.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
Log:
refactored message interface to separate out internal methods to messgeinternal interface

Modified: trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -17,7 +17,7 @@
 
 import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -32,9 +32,9 @@
 
    private final int limit;
 
-   private final Message message;
+   private final MessageInternal message;
 
-   public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer, final Message message)
+   public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer, final MessageInternal message)
    {
       super(buffer.channelBuffer());
 

Modified: trunk/src/main/org/hornetq/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientMessage.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/ClientMessage.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -64,9 +64,8 @@
    /** 
     * Sets the OutputStream that will receive the content of a message received in a non blocking way.
     * 
-    * This method is used for large message and is not meant to be called directly by HornetQ clients.
+    * This method is used when consuming large messages
     * 
-    * @deprecated 
     * @throws HornetQException
     */
    void setOutputStream(OutputStream out) throws HornetQException;
@@ -75,9 +74,8 @@
     * Saves the content of the message to the OutputStream.
     * It will block until the entire content is transfered to the OutputStream.
     *  
-    * This method is used for large message and is not meant to be called directly by HornetQ clients.
+    * This method is used for when consuming large messages
     * 
-    * @deprecated 
     * @throws HornetQException
     */
    void saveToOutputStream(OutputStream out) throws HornetQException;
@@ -85,22 +83,20 @@
    /**
     * Wait the outputStream completion of the message.
     *
-    * This method is used for large message and is not meant to be called directly by HornetQ clients.
+    * This method is used when consuming large messages
     * 
     * @param timeMilliseconds - 0 means wait forever
     * @return true if it reached the end
     * @throws HornetQException
-
-    * @deprecated 
+ 
     */
    boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
 
    /** 
     * Sets the body's IntputStream.
     * 
-    * This method is used for large message and is not meant to be called directly by HornetQ clients.
+    * This method is used when sending large messages
     * 
-    * @deprecated 
     * @throws HornetQException
     */
    void setBodyInputStream(InputStream bodyInputStream);

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -15,6 +15,7 @@
 
 import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.message.impl.MessageInternal;
 
 /**
  * A ClientMessageInternal
@@ -25,7 +26,7 @@
  *
  *
  */
-public interface ClientMessageInternal extends ClientMessage
+public interface ClientMessageInternal extends ClientMessage, MessageInternal
 {
    /** Size used for FlowControl */
    int getFlowControlSize();

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -23,6 +23,7 @@
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.Message;
 import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
@@ -201,18 +202,20 @@
 
    private void doSend(final SimpleString address, final Message msg) throws HornetQException
    {
+      MessageInternal msgI = (MessageInternal)msg;
+      
       ClientProducerCredits theCredits;
 
       if (address != null)
       {
-         msg.setAddress(address);
+         msgI.setAddress(address);
 
          // Anonymous
          theCredits = session.getCredits(address);
       }
       else
       {
-         msg.setAddress(this.address);
+         msgI.setAddress(this.address);
 
          theCredits = credits;
       }
@@ -226,16 +229,16 @@
 
       if (groupID != null)
       {
-         msg.putStringProperty(MessageImpl.HDR_GROUP_ID, groupID);
+         msgI.putStringProperty(MessageImpl.HDR_GROUP_ID, groupID);
       }
 
-      boolean sendBlocking = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
+      boolean sendBlocking = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
 
       session.workDone();
 
       boolean isLarge;
 
-      if (msg.getBodyInputStream() != null || msg.isLargeMessage())
+      if (msgI.getBodyInputStream() != null || msgI.isLargeMessage())
       {
          isLarge = true;
       }
@@ -246,19 +249,19 @@
 
       if (isLarge)
       {
-         largeMessageSend(sendBlocking, msg, theCredits);
+         largeMessageSend(sendBlocking, msgI, theCredits);
       }
       else
       {
-         SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
+         SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking);
 
          if (sendBlocking)
          {
-            channel.sendBlocking(message);
+            channel.sendBlocking(packet);
          }
          else
          {
-            channel.send(message);
+            channel.send(packet);
          }
       }
 
@@ -272,7 +275,7 @@
 
          if (!isLarge)
          {
-            theCredits.acquireCredits(msg.getEncodeSize());
+            theCredits.acquireCredits(msgI.getEncodeSize());
          }
       }
       catch (InterruptedException e)
@@ -291,12 +294,12 @@
    // Methods to send Large Messages----------------------------------------------------------------
 
    /**
-    * @param msg
+    * @param msgI
     * @throws HornetQException
     */
-   private void largeMessageSend(final boolean sendBlocking, final Message msg, final ClientProducerCredits credits) throws HornetQException
+   private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
    {
-      int headerSize = msg.getHeadersAndPropertiesEncodeSize();
+      int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
 
       if (headerSize >= minLargeMessageSize)
       {
@@ -305,14 +308,14 @@
       }
 
       // msg.getBody() could be Null on LargeServerMessage
-      if (msg.getBodyInputStream() == null && msg.getWholeBuffer() != null)
+      if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null)
       {
-         msg.getWholeBuffer().readerIndex(0);
+         msgI.getWholeBuffer().readerIndex(0);
       }
 
       HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
 
-      msg.encodeHeadersAndProperties(headerBuffer);
+      msgI.encodeHeadersAndProperties(headerBuffer);
 
       SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
 
@@ -320,13 +323,13 @@
 
       try
       {
-         credits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+         credits.acquireCredits(msgI.getHeadersAndPropertiesEncodeSize());
       }
       catch (InterruptedException e)
       {
       }
 
-      InputStream input = msg.getBodyInputStream();
+      InputStream input = msgI.getBodyInputStream();
 
       if (input != null)
       {
@@ -334,20 +337,20 @@
       }
       else
       {
-         largeMessageSendBuffered(sendBlocking, msg, credits);
+         largeMessageSendBuffered(sendBlocking, msgI, credits);
       }
    }
 
    /**
     * @param sendBlocking
-    * @param msg
+    * @param msgI
     * @throws HornetQException
     */
    private void largeMessageSendBuffered(final boolean sendBlocking,
-                                         final Message msg,
+                                         final MessageInternal msgI,
                                          final ClientProducerCredits credits) throws HornetQException
    {
-      BodyEncoder context = msg.getBodyEncoder();
+      BodyEncoder context = msgI.getBodyEncoder();
 
       final long bodySize = context.getLargeBodySize();
 

Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/message/Message.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -18,7 +18,6 @@
 import java.util.Set;
 
 import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.exception.HornetQException;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -53,7 +52,6 @@
  * If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a {@code boolean}),
  * a PropertyConversionException will be thrown.
  *
- * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">ClebertSuconic</a>
  * @version <tt>$Revision: 3341 $</tt>
@@ -77,8 +75,6 @@
    /**
     * Sets the address to send this message to.
     * 
-    * This method must not be called directly by HornetQ clients.
-    * 
     * @param address address to send the message to
     */
    void setAddress(SimpleString address);
@@ -169,43 +165,6 @@
     */
    HornetQBuffer getBodyBuffer();
 
-   // Should the following methods really be on the public API?
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   void decodeFromBuffer(HornetQBuffer buffer);
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   int getEndOfMessagePosition();
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   int getEndOfBodyPosition();
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   void checkCopy();
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   void bodyChanged();
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   void resetCopied();
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   HornetQBuffer getEncodedBuffer();
-
    // Properties
    // ------------------------------------------------------------------
 
@@ -523,39 +482,8 @@
     */
    Set<SimpleString> getPropertyNames();
 
-   Map<String, Object> toMap();
-
-   // FIXME - All this stuff is only necessary here for large messages - it should be refactored to be put in a better
-   // place
-
    /**
-    * This method must not be called directly by HornetQ clients.
+    * @return Returns the message in Map form, useful when encoding to JSON
     */
-   int getHeadersAndPropertiesEncodeSize();
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   HornetQBuffer getWholeBuffer();
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   void encodeHeadersAndProperties(HornetQBuffer buffer);
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   void decodeHeadersAndProperties(HornetQBuffer buffer);
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   BodyEncoder getBodyEncoder() throws HornetQException;
-
-   /**
-    * This method must not be called directly by HornetQ clients.
-    */
-   InputStream getBodyInputStream();
-
+   Map<String, Object> toMap();  
 }

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -25,7 +25,6 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
-import org.hornetq.core.message.Message;
 import org.hornetq.core.message.PropertyConversionException;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.utils.DataConstants;
@@ -46,7 +45,7 @@
  *
  * $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
  */
-public abstract class MessageImpl implements Message
+public abstract class MessageImpl implements MessageInternal
 {
    // Constants -----------------------------------------------------
 

Added: trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.message.impl;
+
+import java.io.InputStream;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.BodyEncoder;
+import org.hornetq.core.message.Message;
+
+/**
+ * A MessageInternal
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * TODO - this can be refactored further to separate out large message specific stuff
+ *
+ *
+ */
+public interface MessageInternal extends Message
+{
+   void decodeFromBuffer(HornetQBuffer buffer);
+
+   int getEndOfMessagePosition();
+
+   int getEndOfBodyPosition();
+
+   void checkCopy();
+
+   void bodyChanged();
+
+   void resetCopied();
+
+   HornetQBuffer getEncodedBuffer();
+   
+   int getHeadersAndPropertiesEncodeSize();
+
+   HornetQBuffer getWholeBuffer();
+
+   void encodeHeadersAndProperties(HornetQBuffer buffer);
+
+   void decodeHeadersAndProperties(HornetQBuffer buffer);
+
+   BodyEncoder getBodyEncoder() throws HornetQException;  
+   
+   InputStream getBodyInputStream();
+}

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -15,6 +15,7 @@
 
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
 
 /**
  * A MessagePacket
@@ -27,9 +28,9 @@
 {
    private static final Logger log = Logger.getLogger(MessagePacket.class);
 
-   protected Message message;
+   protected MessageInternal message;
 
-   public MessagePacket(final byte type, final Message message)
+   public MessagePacket(final byte type, final MessageInternal message)
    {
       super(type);
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -16,7 +16,7 @@
 import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.utils.DataConstants;
 
@@ -36,7 +36,7 @@
 
    private int deliveryCount;
 
-   public SessionReceiveMessage(final long consumerID, final Message message, final int deliveryCount)
+   public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount)
    {
       super(PacketImpl.SESS_RECEIVE_MSG, message);
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -16,6 +16,7 @@
 import org.hornetq.core.buffers.HornetQBuffer;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.utils.DataConstants;
@@ -39,7 +40,7 @@
 
    // Constructors --------------------------------------------------
 
-   public SessionSendMessage(final Message message, final boolean requiresResponse)
+   public SessionSendMessage(final MessageInternal message, final boolean requiresResponse)
    {
       super(PacketImpl.SESS_SEND, message);
 

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -14,7 +14,7 @@
 package org.hornetq.core.server;
 
 import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.paging.PagingStore;
 
 /**
@@ -25,7 +25,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public interface ServerMessage extends Message, EncodingSupport
+public interface ServerMessage extends MessageInternal, EncodingSupport
 {
    void setMessageID(long id);
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java	2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java	2009-12-10 11:44:51 UTC (rev 8662)
@@ -700,12 +700,8 @@
 
    public Object getObjectProperty(final String name) throws JMSException
    {
-      if (HornetQMessage.JMS_HORNETQ_INPUT_STREAM.equals(name))
+      if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
       {
-         return message.getBodyInputStream();
-      }
-      else if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
-      {
          return String.valueOf(message.getDeliveryCount());
       }
 



More information about the hornetq-commits mailing list