[hornetq-commits] JBoss hornetq SVN: r8663 - in trunk: src/main/org/hornetq/core/message and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 10 06:54:00 EST 2009
Author: ataylor
Date: 2009-12-10 06:53:59 -0500 (Thu, 10 Dec 2009)
New Revision: 8663
Modified:
trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java
trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java
trunk/src/main/org/hornetq/core/remoting/spi/Connection.java
trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java
trunk/src/main/org/hornetq/core/remoting/spi/Connector.java
trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
Log:
javadocs and some refactoring of message
Modified: trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
===================================================================
--- trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -13,6 +13,7 @@
package org.hornetq.core.example;
import java.util.Date;
+import java.util.HashMap;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
@@ -114,4 +115,12 @@
}
}
+ {
+ HashMap<String, Object> map = new HashMap<String, Object>();
+ map.put("host", "localhost");
+ map.put("port", 5445);
+ TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName(), map);
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(config);
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -166,10 +166,8 @@
HornetQBuffer getBodyBuffer();
// Properties
- // ------------------------------------------------------------------
+ // -----------------------------------------------------------------
- TypedProperties getProperties();
-
/**
* Puts a boolean property in this message.
*
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -792,11 +792,6 @@
return properties.getPropertyNames();
}
- public TypedProperties getProperties()
- {
- return properties;
- }
-
public HornetQBuffer getWholeBuffer()
{
return buffer;
@@ -815,6 +810,12 @@
// Private -------------------------------------------------------
+
+ private TypedProperties getProperties()
+ {
+ return properties;
+ }
+
// This must be synchronized as it can be called concurrently id the message is being delivered concurently to
// many queues - the first caller in this case will actually encode it
private synchronized HornetQBuffer encodeToBuffer()
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -1255,9 +1255,9 @@
messageEncoding.decode(buff);
- if (largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
+ if (largeMessage.containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
{
- long originalMessageID = largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+ long originalMessageID = largeMessage.getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -151,9 +151,4 @@
return "invm:" + serverID;
}
- public void fail(final HornetQException me)
- {
- listener.connectionException(id, me);
- }
-
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -25,7 +25,15 @@
*/
public interface Acceptor extends HornetQComponent
{
+ /**
+ * Pause the acceptor and stop it from receiving client requests.
+ */
void pause();
+ /**
+ * Set the notification service for this acceptor to use.
+ *
+ * @param notificationService the notification service
+ */
void setNotificationService(NotificationService notificationService);
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -19,17 +19,38 @@
import java.util.concurrent.ScheduledExecutorService;
/**
+ * A factory for creating acceptors.
+ * <p/>
+ * An Acceptor is an endpoin that a {@link org.hornetq.core.remoting.spi.Connector} will connect to and is used by the remoting service.
+ *
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
public interface AcceptorFactory
{
+ /**
+ * Create a new instance of an Acceptor.
+ *
+ * @param configuration the configuration
+ * @param handler the handler
+ * @param listener the listener
+ * @param threadPool the threadpool
+ * @param scheduledThreadPool a scheduled thread pool
+ * @return an acceptor
+ */
Acceptor createAcceptor(final Map<String, Object> configuration,
BufferHandler handler,
ConnectionLifeCycleListener listener,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
+ /**
+ * Returns the allowable properties for this acceptor.
+ * <p/>
+ * This will differ between different acceptor implementations.
+ *
+ * @return the allowable properties.
+ */
Set<String> getAllowableProperties();
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -15,14 +15,29 @@
import org.hornetq.core.buffers.HornetQBuffer;
/**
- * A BufferHandler
- *
+ * A BufferHandler that will handle buffers received by an acceptor.
+ * <p/>
+ * The Buffer Handler will decode the buffer and take the appropriate action, typically forwarding to the correct channel.
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public interface BufferHandler
{
+ /**
+ * called by the remoting connection when a buffer is received.
+ *
+ * @param connectionID the connection the buffer was received on
+ * @param buffer the buffer to decode
+ */
void bufferReceived(Object connectionID, HornetQBuffer buffer);
+ /**
+ * called by the remoting connection prior to {@link org.hornetq.core.remoting.spi.BufferHandler#bufferReceived(Object, org.hornetq.core.buffers.HornetQBuffer)}.
+ * <p/>
+ * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
+ *
+ * @param buffer the buffer
+ * @return true id the buffer can be decoded..
+ */
int isReadyToHandle(HornetQBuffer buffer);
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Connection.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Connection.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -17,25 +17,52 @@
import org.hornetq.core.exception.HornetQException;
/**
+ * The connection used by a channel to write data to.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
* @version <tt>$Revision$</tt>
- *
*/
public interface Connection
{
+ /**
+ * Create a new HornetQBuffer of the given size.
+ *
+ * @param size the size of buffer to create
+ * @return the new buffer.
+ */
HornetQBuffer createBuffer(int size);
+ /**
+ * returns the unique id of this wire.
+ *
+ * @return the id
+ */
Object getID();
+ /**
+ * writes the buffer to the wire.
+ *
+ * @param buffer the buffer to write
+ */
void write(HornetQBuffer buffer);
+ /**
+ * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
+ *
+ * @param buffer the buffer to write
+ * @param flush whether to flush the buffers onto the wire
+ */
void write(HornetQBuffer buffer, boolean flush);
+ /**
+ * closes this connection.
+ */
void close();
+ /**
+ * returns a string representation of the remote address this connection is connected to.
+ *
+ * @return the remote address
+ */
String getRemoteAddress();
-
- void fail(HornetQException me);
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -15,17 +15,32 @@
import org.hornetq.core.exception.HornetQException;
/**
+ * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
*
- * A ConnectionLifeCycleListener
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public interface ConnectionLifeCycleListener
{
+ /**
+ * called when a connection is created.
+ *
+ * @param connection the connection that has been created
+ */
void connectionCreated(Connection connection);
+ /**
+ * called when a connection is destroyed.
+ *
+ * @param connectionID the connection being destroyed.
+ */
void connectionDestroyed(Object connectionID);
+
+ /**
+ * called when an error occurs on the connection.
+ *
+ * @param connectionID the id of the connection.
+ * @param me the exception.
+ */
void connectionException(Object connectionID, HornetQException me);
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Connector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Connector.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Connector.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -13,26 +13,35 @@
package org.hornetq.core.remoting.spi;
/**
- *
- * A Connector
- *
+ * A Connector is used by the client for creating and controlling a connection.
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public interface Connector
{
+ /**
+ * starts the connector
+ */
void start();
+ /**
+ * closes the connector
+ */
void close();
+ /**
+ * returns true if the connector is started, oterwise false.
+ *
+ * @return true if the connector is started
+ */
boolean isStarted();
/**
* Create and return a connection from this connector.
- *
+ * <p/>
* This method must NOT throw an exception if it fails to create the connection
* (e.g. network is not available), in this case it MUST return null
- *
+ *
* @return The connection, or null if unable to create a connection (e.g. network is unavailable)
*/
Connection createConnection();
Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -19,14 +19,25 @@
import java.util.concurrent.ScheduledExecutorService;
/**
- *
- * A ConnectorFactory
- *
+ * A ConnectorFactory is used by the client for creating connectors.
+ * <p/>
+ * A Connector is used to connect to an {@link org.hornetq.core.remoting.spi.Acceptor}.
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public interface ConnectorFactory
{
+ /**
+ * creates a new instanc of a connector.
+ *
+ * @param configuration the configuration
+ * @param handler the handler
+ * @param listener the listener
+ * @param closeExecutor the close executor
+ * @param threadPool the threadpool
+ * @param scheduledThreadPool the scheduled thread pool
+ * @return a new connector
+ */
Connector createConnector(Map<String, Object> configuration,
BufferHandler handler,
ConnectionLifeCycleListener listener,
@@ -34,5 +45,12 @@
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
+ /**
+ * Returns the allowable properties for this connector.
+ * <p/>
+ * This will differ between different connector implementations.
+ *
+ * @return the allowable properties.
+ */
Set<String> getAllowableProperties();
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -16,11 +16,7 @@
import static org.hornetq.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.core.management.NotificationType.CONSUMER_CREATED;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
@@ -761,8 +757,27 @@
binding.addConsumer(filterString);
// Need to propagate the consumer add
- Notification notification = new Notification(null, CONSUMER_CREATED, message.getProperties());
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+ Queue theQueue = (Queue)binding.getBindable();
+
+ props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+
+ if (filterString != null)
+ {
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ }
+
+ Notification notification = new Notification(null, CONSUMER_CREATED, props);
+
managementService.sendNotification(notification);
}
@@ -796,8 +811,26 @@
binding.removeConsumer(filterString);
// Need to propagate the consumer close
- Notification notification = new Notification(null, CONSUMER_CLOSED, message.getProperties());
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+ Queue theQueue = (Queue)binding.getBindable();
+
+ props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+
+ if (filterString != null)
+ {
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ }
+ Notification notification = new Notification(null, CONSUMER_CLOSED, props);
+
managementService.sendNotification(notification);
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -145,11 +145,6 @@
return channel.getRemoteAddress().toString();
}
- public void fail(final HornetQException me)
- {
- listener.connectionException(channel.getId(), me);
- }
-
// Public --------------------------------------------------------
@Override
Modified: trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -181,7 +181,7 @@
for (int i = start; i < end; i++)
{
ClientMessage msg = session.createMessage(true);
- msg.getProperties().putIntProperty(new SimpleString("key"), i);
+ msg.putIntProperty(new SimpleString("key"), i);
msg.getBodyBuffer().writeUTF("message " + i);
prod.send(msg);
}
More information about the hornetq-commits
mailing list