[hornetq-commits] JBoss hornetq SVN: r8663 - in trunk: src/main/org/hornetq/core/message and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 10 06:54:00 EST 2009


Author: ataylor
Date: 2009-12-10 06:53:59 -0500 (Thu, 10 Dec 2009)
New Revision: 8663

Modified:
   trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
   trunk/src/main/org/hornetq/core/message/Message.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
   trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
   trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java
   trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java
   trunk/src/main/org/hornetq/core/remoting/spi/Connection.java
   trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java
   trunk/src/main/org/hornetq/core/remoting/spi/Connector.java
   trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
   trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
Log:
javadocs and some refactoring of message

Modified: trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
===================================================================
--- trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -13,6 +13,7 @@
 package org.hornetq.core.example;
 
 import java.util.Date;
+import java.util.HashMap;
 
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
@@ -114,4 +115,12 @@
       }
    }
 
+   {
+      HashMap<String, Object> map = new HashMap<String, Object>();
+      map.put("host", "localhost");
+      map.put("port", 5445);
+      TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName(), map);
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(config);
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/message/Message.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -166,10 +166,8 @@
    HornetQBuffer getBodyBuffer();
 
    // Properties
-   // ------------------------------------------------------------------
+   // -----------------------------------------------------------------
 
-   TypedProperties getProperties();
-
    /**
     * Puts a boolean property in this message.
     * 

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -792,11 +792,6 @@
       return properties.getPropertyNames();
    }
 
-   public TypedProperties getProperties()
-   {
-      return properties;
-   }
-
    public HornetQBuffer getWholeBuffer()
    {
       return buffer;
@@ -815,6 +810,12 @@
 
    // Private -------------------------------------------------------
 
+
+   private TypedProperties getProperties()
+   {
+      return properties;
+   }
+   
    // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
    // many queues - the first caller in this case will actually encode it
    private synchronized HornetQBuffer encodeToBuffer()

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -1255,9 +1255,9 @@
 
       messageEncoding.decode(buff);
 
-      if (largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
+      if (largeMessage.containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
       {
-         long originalMessageID = largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+         long originalMessageID = largeMessage.getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
 
          LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -151,9 +151,4 @@
       return "invm:" + serverID;
    }
 
-   public void fail(final HornetQException me)
-   {
-      listener.connectionException(id, me);
-   }
-
 }

Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -25,7 +25,15 @@
  */
 public interface Acceptor extends HornetQComponent
 {
+   /**
+    * Pause the acceptor and stop it from receiving client requests.
+    */
    void pause();
 
+   /**
+    * Set the notification service for this acceptor to use.
+    *
+    * @param notificationService the notification service
+    */
    void setNotificationService(NotificationService notificationService);
 }

Modified: trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -19,17 +19,38 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
+ * A factory for creating acceptors.
+ * <p/>
+ * An Acceptor is an endpoin that a {@link org.hornetq.core.remoting.spi.Connector} will connect to and is used by the remoting service.
+ *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
 public interface AcceptorFactory
 {
+   /**
+    * Create a new instance of an Acceptor.
+    *
+    * @param configuration       the configuration
+    * @param handler             the handler
+    * @param listener            the listener
+    * @param threadPool          the threadpool
+    * @param scheduledThreadPool a scheduled thread pool
+    * @return an acceptor
+    */
    Acceptor createAcceptor(final Map<String, Object> configuration,
                            BufferHandler handler,
                            ConnectionLifeCycleListener listener,
                            Executor threadPool,
                            ScheduledExecutorService scheduledThreadPool);
 
+   /**
+    * Returns the allowable properties for this acceptor.
+    * <p/>
+    * This will differ between different acceptor implementations.
+    *
+    * @return the allowable properties.
+    */
    Set<String> getAllowableProperties();
 }

Modified: trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -15,14 +15,29 @@
 import org.hornetq.core.buffers.HornetQBuffer;
 
 /**
- * A BufferHandler
- * 
+ * A BufferHandler that will handle buffers received by an acceptor.
+ * <p/>
+ * The Buffer Handler will decode the buffer and take the appropriate action, typically forwarding to the correct channel.
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public interface BufferHandler
 {
+   /**
+    * called by the remoting connection when a buffer is received.
+    *
+    * @param connectionID the connection the buffer was received on
+    * @param buffer       the buffer to decode
+    */
    void bufferReceived(Object connectionID, HornetQBuffer buffer);
 
+   /**
+    * called by the remoting connection prior to {@link org.hornetq.core.remoting.spi.BufferHandler#bufferReceived(Object, org.hornetq.core.buffers.HornetQBuffer)}.
+    * <p/>
+    * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
+    *
+    * @param buffer the buffer
+    * @return true id the buffer can be decoded..
+    */
    int isReadyToHandle(HornetQBuffer buffer);
 }

Modified: trunk/src/main/org/hornetq/core/remoting/spi/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Connection.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Connection.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -17,25 +17,52 @@
 import org.hornetq.core.exception.HornetQException;
 
 /**
+ * The connection used by a channel to write data to.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  * @version <tt>$Revision$</tt>
- *
  */
 public interface Connection
 {
+   /**
+    * Create a new HornetQBuffer of the given size.
+    *
+    * @param size the size of buffer to create
+    * @return the new buffer.
+    */
    HornetQBuffer createBuffer(int size);
 
+   /**
+    * returns the unique id of this wire.
+    *
+    * @return the id
+    */
    Object getID();
 
+   /**
+    * writes the buffer to the wire.
+    *
+    * @param buffer the buffer to write
+    */
    void write(HornetQBuffer buffer);
 
+   /**
+    * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
+    *
+    * @param buffer the buffer to write
+    * @param flush  whether to flush the buffers onto the wire
+    */
    void write(HornetQBuffer buffer, boolean flush);
 
+   /**
+    * closes this connection.
+    */
    void close();
 
+   /**
+    * returns a string representation of the remote address this connection is connected to.
+    *
+    * @return the remote address
+    */
    String getRemoteAddress();
-
-   void fail(HornetQException me);
 }
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -15,17 +15,32 @@
 import org.hornetq.core.exception.HornetQException;
 
 /**
+ * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
  *
- * A ConnectionLifeCycleListener
- *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public interface ConnectionLifeCycleListener
 {
+   /**
+    * called when a connection is created.
+    *
+    * @param connection the connection that has been created
+    */
    void connectionCreated(Connection connection);
 
+   /**
+    * called when a connection is destroyed.
+    *
+    * @param connectionID the connection being destroyed.
+    */
    void connectionDestroyed(Object connectionID);
 
+
+   /**
+    * called when an error occurs on the connection.
+    *
+    * @param connectionID the id of the connection.
+    * @param me           the exception.
+    */
    void connectionException(Object connectionID, HornetQException me);
 }

Modified: trunk/src/main/org/hornetq/core/remoting/spi/Connector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Connector.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Connector.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -13,26 +13,35 @@
 package org.hornetq.core.remoting.spi;
 
 /**
- * 
- * A Connector
- * 
+ * A Connector is used by the client for creating and controlling a connection.
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public interface Connector
 {
+   /**
+    * starts the connector
+    */
    void start();
 
+   /**
+    * closes the connector
+    */
    void close();
 
+   /**
+    * returns true if the connector is started, oterwise false.
+    *
+    * @return true if the connector is started
+    */
    boolean isStarted();
 
    /**
     * Create and return a connection from this connector.
-    * 
+    * <p/>
     * This method must NOT throw an exception if it fails to create the connection
     * (e.g. network is not available), in this case it MUST return null
-    * 
+    *
     * @return The connection, or null if unable to create a connection (e.g. network is unavailable)
     */
    Connection createConnection();

Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -19,14 +19,25 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
- * 
- * A ConnectorFactory
- * 
+ * A ConnectorFactory is used by the client for creating connectors.
+ * <p/>
+ * A Connector is used to connect to an {@link org.hornetq.core.remoting.spi.Acceptor}.
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public interface ConnectorFactory
 {
+   /**
+    * creates a new instanc of a connector.
+    *
+    * @param configuration       the configuration
+    * @param handler             the handler
+    * @param listener            the listener
+    * @param closeExecutor       the close executor
+    * @param threadPool          the threadpool
+    * @param scheduledThreadPool the scheduled thread pool
+    * @return a new connector
+    */
    Connector createConnector(Map<String, Object> configuration,
                              BufferHandler handler,
                              ConnectionLifeCycleListener listener,
@@ -34,5 +45,12 @@
                              Executor threadPool,
                              ScheduledExecutorService scheduledThreadPool);
 
+   /**
+    * Returns the allowable properties for this connector.
+    * <p/>
+    * This will differ between different connector implementations.
+    *
+    * @return the allowable properties.
+    */
    Set<String> getAllowableProperties();
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -16,11 +16,7 @@
 import static org.hornetq.core.management.NotificationType.CONSUMER_CLOSED;
 import static org.hornetq.core.management.NotificationType.CONSUMER_CREATED;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -761,8 +757,27 @@
          binding.addConsumer(filterString);
 
          // Need to propagate the consumer add
-         Notification notification = new Notification(null, CONSUMER_CREATED, message.getProperties());
+         TypedProperties props = new TypedProperties();
 
+         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+
+         props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
+
+         props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+
+         props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+         Queue theQueue = (Queue)binding.getBindable();
+
+         props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+
+         if (filterString != null)
+         {
+            props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+         }
+
+         Notification notification = new Notification(null, CONSUMER_CREATED, props);
+
          managementService.sendNotification(notification);
       }
 
@@ -796,8 +811,26 @@
          binding.removeConsumer(filterString);
 
          // Need to propagate the consumer close
-         Notification notification = new Notification(null, CONSUMER_CLOSED, message.getProperties());
+         TypedProperties props = new TypedProperties();
 
+         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+
+         props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
+
+         props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+
+         props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+         Queue theQueue = (Queue)binding.getBindable();
+
+         props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+
+         if (filterString != null)
+         {
+            props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+         }
+         Notification notification = new Notification(null, CONSUMER_CLOSED, props);
+
          managementService.sendNotification(notification);
       }
 

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -145,11 +145,6 @@
       return channel.getRemoteAddress().toString();
    }
 
-   public void fail(final HornetQException me)
-   {
-      listener.connectionException(channel.getId(), me);
-   }
-
    // Public --------------------------------------------------------
 
    @Override

Modified: trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java	2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java	2009-12-10 11:53:59 UTC (rev 8663)
@@ -181,7 +181,7 @@
          for (int i = start; i < end; i++)
          {
             ClientMessage msg = session.createMessage(true);
-            msg.getProperties().putIntProperty(new SimpleString("key"), i);
+            msg.putIntProperty(new SimpleString("key"), i);
             msg.getBodyBuffer().writeUTF("message " + i);
             prod.send(msg);
          }



More information about the hornetq-commits mailing list