JBoss hornetq SVN: r10916 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 10:00:57 -0400 (Tue, 05 Jul 2011)
New Revision: 10916
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
remove unused import
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-07-05 14:00:12 UTC (rev 10915)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-07-05 14:00:57 UTC (rev 10916)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.concurrent.Executor;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
13 years, 5 months
JBoss hornetq SVN: r10915 - in trunk/hornetq-core/src/main/java/org/hornetq/api/core: management and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 10:00:12 -0400 (Tue, 05 Jul 2011)
New Revision: 10915
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java
Log:
Spell fixes and @link additions to Javadocs.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java 2011-07-04 10:46:18 UTC (rev 10914)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java 2011-07-05 14:00:12 UTC (rev 10915)
@@ -20,9 +20,9 @@
import org.hornetq.api.core.Message;
/**
- *
+ *
* A ClientMessage represents a message sent and/or received by HornetQ.
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -36,22 +36,20 @@
/**
* Set the delivery count for this message.
- *
+ *
* This method is not meant to be called by HornetQ clients.
- *
+ *
* @param deliveryCount message delivery count
*/
void setDeliveryCount(int deliveryCount);
/**
* Acknowledge reception of this message.
- *
- * If the session responsible to acknowledge this message has {@code autoCommitAcks}
- * set to {@code true}, the transaction will automatically commit the current transaction.
- * Otherwise, this acknwoledgement will not be committed until the client commits the session transaction.
- *
- * @throws HornetQException if an error occured while acknowledging the message.
- *
+ * <p>
+ * If the session responsible to acknowledge this message has {@code autoCommitAcks} set to
+ * {@code true}, the transaction will automatically commit the current transaction. Otherwise,
+ * this acknowledgement will not be committed until the client commits the session transaction.
+ * @throws HornetQException if an error occurred while acknowledging the message.
* @see ClientSession#isAutoCommitAcks()
*/
void acknowledge() throws HornetQException;
@@ -61,20 +59,20 @@
*/
int getBodySize();
- /**
+ /**
* Sets the OutputStream that will receive the content of a message received in a non blocking way.
- * <br>
+ * <br>
* This method is used when consuming large messages
- *
+ *
* @throws HornetQException
*/
void setOutputStream(OutputStream out) throws HornetQException;
- /**
+ /**
* Saves the content of the message to the OutputStream.
* It will block until the entire content is transfered to the OutputStream.
- * <br>
- *
+ * <br>
+ *
* @throws HornetQException
*/
void saveToOutputStream(OutputStream out) throws HornetQException;
@@ -83,18 +81,18 @@
* Wait the outputStream completion of the message.
*
* This method is used when consuming large messages
- *
+ *
* @param timeMilliseconds - 0 means wait forever
* @return true if it reached the end
* @throws HornetQException
*/
boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
- /**
- * Sets the body's IntputStream.
- * <br>
+ /**
+ * Sets the body's IntputStream.
+ * <br>
* This method is used when sending large messages
- *
+ *
* @throws HornetQException
*/
void setBodyInputStream(InputStream bodyInputStream);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java 2011-07-04 10:46:18 UTC (rev 10914)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java 2011-07-05 14:00:12 UTC (rev 10915)
@@ -16,20 +16,20 @@
import org.hornetq.api.core.Message;
/**
- * A SendAcknowledgementHandler notifies a client when an message sent asynchronously has been received by the server.
- * <br />
- * If the session is not blocking when sending durable or non-durable messages, the session can
- * set a SendAcknowledgementHandler to be notified later when the messages
- * has been received by the server. The method {@code sendAcknowledged} will be called with the message that
- * was sent asynchronously.
- *
+ * A SendAcknowledgementHandler notifies a client when an message sent asynchronously has been
+ * received by the server.
+ * <p>
+ * If the session is not blocking when sending durable or non-durable messages, the session can set
+ * a SendAcknowledgementHandler to be notified later when the messages has been received by the
+ * server. The method {@link #sendAcknowledged(Message)} will be called with the message that was
+ * sent asynchronously.
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public interface SendAcknowledgementHandler
{
/**
* Notifies the client that a message sent asynchronously has been received by the server.
- *
+ *
* @param message message sent asynchronously
*/
void sendAcknowledged(Message message);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java 2011-07-04 10:46:18 UTC (rev 10914)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java 2011-07-05 14:00:12 UTC (rev 10915)
@@ -31,19 +31,19 @@
* This method will disable any checks when a GarbageCollection happens
* leaving connections open. The JMS Layer will make specific usage of this
* method, since the ConnectionFactory.finalize should release this.
- *
+ *
* Warning: You may leave resources unattended if you call this method and
* don't take care of cleaning the resources yourself.
*/
void disableFinalizeCheck();
-
+
/**
* Create a ClientSessionFactory using whatever load balancing policy is in force
* @return The ClientSessionFactory
* @throws Exception
*/
ClientSessionFactory createSessionFactory() throws Exception;
-
+
/**
* Create a ClientSessionFactory to a specific server. The server must already be known about by this ServerLocator.
* This method allows the user to make a connection to a specific server bypassing any load balancing policy in force
@@ -52,39 +52,39 @@
* @throws Exception if a failure happened in creating the ClientSessionFactory or the ServerLocator does not know about the passed in transportConfiguration
*/
ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception;
-
+
/**
* Returns the period used to check if a client has failed to receive pings from the server.
- *
+ *
* Period is in milliseconds, default value is {@link HornetQClient#DEFAULT_CLIENT_FAILURE_CHECK_PERIOD}.
- *
+ *
* @return the period used to check if a client has failed to receive pings from the server
*/
long getClientFailureCheckPeriod();
/**
* Sets the period (in milliseconds) used to check if a client has failed to receive pings from the server.
- *
+ *
* Value must be -1 (to disable) or greater than 0.
- *
+ *
* @param clientFailureCheckPeriod the period to check failure
*/
void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
/**
* When <code>true</code>, consumers created through this factory will create temporary files to cache large messages.
- *
+ *
* There is 1 temporary file created for each large message.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_CACHE_LARGE_MESSAGE_CLIENT}.
- *
+ *
* @return <code>true</code> if consumers created through this factory will cache large messages in temporary files, <code>false</code> else
*/
boolean isCacheLargeMessagesClient();
/**
* Sets whether large messages received by consumers created through this factory will be cached in temporary files or not.
- *
+ *
* @param cached <code>true</code> to cache large messages in temporary files, <code>false</code> else
*/
void setCacheLargeMessagesClient(boolean cached);
@@ -92,73 +92,73 @@
/**
* Returns the connection <em>time-to-live</em>.
* This TTL determines how long the server will keep a connection alive in the absence of any data arriving from the client.
- *
+ *
* Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_CONNECTION_TTL}.
- *
+ *
* @return the connection time-to-live in milliseconds
*/
long getConnectionTTL();
/**
* Sets this factory's connections <em>time-to-live</em>.
- *
+ *
* Value must be -1 (to disable) or greater or equals to 0.
- *
+ *
* @param connectionTTL period in milliseconds
*/
void setConnectionTTL(long connectionTTL);
/**
* Returns the blocking calls timeout.
- *
+ *
* If client's blocking calls to the server take more than this timeout, the call will throw a {@link HornetQException} with the code {@link HornetQException#CONNECTION_TIMEDOUT}.
* Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_CALL_TIMEOUT}.
- *
+ *
* @return the blocking calls timeout
*/
long getCallTimeout();
/**
* Sets the blocking call timeout.
- *
+ *
* Value must be greater or equals to 0
- *
+ *
* @param callTimeout blocking call timeout in milliseconds
*/
void setCallTimeout(long callTimeout);
/**
* Returns the large message size threshold.
- *
+ *
* Messages whose size is if greater than this value will be handled as <em>large messages</em>.
- *
+ *
* Value is in bytes, default value is {@link HornetQClient#DEFAULT_MIN_LARGE_MESSAGE_SIZE}.
- *
+ *
* @return the message size threshold to treat messages as large messages.
*/
int getMinLargeMessageSize();
/**
* Sets the large message size threshold.
- *
+ *
* Value must be greater than 0.
- *
+ *
* @param minLargeMessageSize large message size threshold in bytes
*/
void setMinLargeMessageSize(int minLargeMessageSize);
/**
* Returns the window size for flow control of the consumers created through this factory.
- *
+ *
* Value is in bytes, default value is {@link HornetQClient#DEFAULT_CONSUMER_WINDOW_SIZE}.
- *
+ *
* @return the window size used for consumer flow control
*/
int getConsumerWindowSize();
/**
* Sets the window size for flow control of the consumers created through this factory.
- *
+ *
* Value must be -1 (to disable flow control), 0 (to not buffer any messages) or greater than 0 (to set the maximum size of the buffer)
*
* @param consumerWindowSize window size (in bytes) used for consumer flow control
@@ -167,37 +167,37 @@
/**
* Returns the maximum rate of message consumption for consumers created through this factory.
- *
+ *
* This value controls the rate at which a consumer can consume messages. A consumer will never consume messages at a rate faster than the rate specified.
- *
+ *
* Value is -1 (to disable) or a positive integer corresponding to the maximum desired message consumption rate specified in units of messages per second.
* Default value is {@link HornetQClient#DEFAULT_CONSUMER_MAX_RATE}.
- *
+ *
* @return the consumer max rate
*/
int getConsumerMaxRate();
/**
* Sets the maximum rate of message consumption for consumers created through this factory.
- *
+ *
* Value must -1 (to disable) or a positive integer corresponding to the maximum desired message consumption rate specified in units of messages per second.
- *
+ *
* @param consumerMaxRate maximum rate of message consumption (in messages per seconds)
*/
void setConsumerMaxRate(int consumerMaxRate);
/**
* Returns the size for the confirmation window of clients using this factory.
- *
+ *
* Value is in bytes or -1 (to disable the window). Default value is {@link HornetQClient#DEFAULT_CONFIRMATION_WINDOW_SIZE}.
- *
+ *
* @return the size for the confirmation window of clients using this factory
*/
int getConfirmationWindowSize();
/**
* Sets the size for the confirmation window buffer of clients using this factory.
- *
+ *
* Value must be -1 (to disable the window) or greater than 0.
* @param confirmationWindowSize size of the confirmation window (in bytes)
@@ -206,40 +206,39 @@
/**
* Returns the window size for flow control of the producers created through this factory.
- *
+ *
* Value must be -1 (to disable flow control) or greater than 0 to determine the maximum amount of bytes at any give time (to prevent overloading the connection).
* Default value is {@link HornetQClient#DEFAULT_PRODUCER_WINDOW_SIZE}.
- *
+ *
* @return the window size for flow control of the producers created through this factory.
*/
int getProducerWindowSize();
/**
- * Returns the window size for flow control of the producers created through this factory.
- *
- * Value must be -1 (to disable flow control) or greater than 0.
- *
- * @param producerWindowSize window size (in bytest) for flow control of the producers created through this factory.
+ * Returns the window size for flow control of the producers created through this factory. Value
+ * must be -1 (to disable flow control) or greater than 0.
+ * @param producerWindowSize window size (in bytes) for flow control of the producers created
+ * through this factory.
*/
void setProducerWindowSize(int producerWindowSize);
/**
* Returns the maximum rate of message production for producers created through this factory.
- *
+ *
* This value controls the rate at which a producer can produce messages. A producer will never produce messages at a rate faster than the rate specified.
- *
+ *
* Value is -1 (to disable) or a positive integer corresponding to the maximum desired message production rate specified in units of messages per second.
* Default value is {@link HornetQClient#DEFAULT_PRODUCER_MAX_RATE}.
- *
+ *
* @return maximum rate of message production (in messages per seconds)
*/
int getProducerMaxRate();
/**
* Sets the maximum rate of message production for producers created through this factory.
- *
+ *
* Value must -1 (to disable) or a positive integer corresponding to the maximum desired message production rate specified in units of messages per second.
- *
+ *
* @param producerMaxRate maximum rate of message production (in messages per seconds)
*/
void setProducerMaxRate(int producerMaxRate);
@@ -247,9 +246,9 @@
/**
* Returns whether consumers created through this factory will block while
* sending message acknowledgments or do it asynchronously.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_ACKNOWLEDGE}.
- *
+ *
* @return whether consumers will block while sending message
* acknowledgments or do it asynchronously
*/
@@ -258,7 +257,7 @@
/**
* Sets whether consumers created through this factory will block while
* sending message acknowledgments or do it asynchronously.
- *
+ *
* @param blockOnAcknowledge
* <code>true</code> to block when sending message
* acknowledgments or <code>false</code> to send them
@@ -271,7 +270,7 @@
* <br>
* If the session is configured to send durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
* to be notified once the message has been handled by the server.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_DURABLE_SEND}.
*
* @return whether producers will block while sending persistent messages or do it asynchronously
@@ -280,7 +279,7 @@
/**
* Sets whether producers created through this factory will block while sending <em>durable</em> messages or do it asynchronously.
- *
+ *
* @param blockOnDurableSend <code>true</code> to block when sending durable messages or <code>false</code> to send them asynchronously
*/
void setBlockOnDurableSend(boolean blockOnDurableSend);
@@ -290,7 +289,7 @@
* <br>
* If the session is configured to send non-durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
* to be notified once the message has been handled by the server.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_NON_DURABLE_SEND}.
*
* @return whether producers will block while sending non-durable messages or do it asynchronously
@@ -299,7 +298,7 @@
/**
* Sets whether producers created through this factory will block while sending <em>non-durable</em> messages or do it asynchronously.
- *
+ *
* @param blockOnNonDurableSend <code>true</code> to block when sending non-durable messages or <code>false</code> to send them asynchronously
*/
void setBlockOnNonDurableSend(boolean blockOnNonDurableSend);
@@ -307,11 +306,11 @@
/**
* Returns whether producers created through this factory will automatically
* assign a group ID to the messages they sent.
- *
+ *
* if <code>true</code>, a random unique group ID is created and set on each message for the property
* {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
* Default value is {@link HornetQClient#DEFAULT_AUTO_GROUP}.
- *
+ *
* @return whether producers will automatically assign a group ID to their messages
*/
boolean isAutoGroup();
@@ -319,23 +318,23 @@
/**
* Sets whether producers created through this factory will automatically
* assign a group ID to the messages they sent.
- *
+ *
* @param autoGroup <code>true</code> to automatically assign a group ID to each messages sent through this factory, <code>false</code> else
*/
void setAutoGroup(boolean autoGroup);
/**
* Returns the group ID that will be eventually set on each message for the property {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
- *
+ *
* Default value is is <code>null</code> and no group ID will be set on the messages.
- *
+ *
* @return the group ID that will be eventually set on each message
*/
String getGroupID();
-
+
/**
* Sets the group ID that will be set on each message sent through this factory.
- *
+ *
* @param groupID the group ID to use
*/
void setGroupID(String groupID);
@@ -351,7 +350,7 @@
* Sets to <code>true</code> to pre-acknowledge consumed messages on the
* server before they are sent to consumers, else set to <code>false</code>
* to let clients acknowledge the message they consume.
- *
+ *
* @param preAcknowledge
* <code>true</code> to enable pre-acknowledgment,
* <code>false</code> else
@@ -360,18 +359,18 @@
/**
* Returns the acknowledgments batch size.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_ACK_BATCH_SIZE}.
- *
+ *
* @return the acknowledgments batch size
*/
int getAckBatchSize();
/**
* Sets the acknowledgments batch size.
- *
+ *
* Value must be equal or greater than 0.
- *
+ *
* @param ackBatchSize
* acknowledgments batch size
*/
@@ -392,9 +391,9 @@
/**
* Returns whether this factory will use global thread pools (shared among all the factories in the same JVM)
* or its own pools.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_USE_GLOBAL_POOLS}.
- *
+ *
* @return <code>true</code> if this factory uses global thread pools, <code>false</code> else
*/
boolean isUseGlobalPools();
@@ -402,117 +401,117 @@
/**
* Sets whether this factory will use global thread pools (shared among all the factories in the same JVM)
* or its own pools.
- *
+ *
* @param useGlobalPools <code>true</code> to let this factory uses global thread pools, <code>false</code> else
*/
void setUseGlobalPools(boolean useGlobalPools);
/**
* Returns the maximum size of the scheduled thread pool.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE}.
- *
+ *
* @return the maximum size of the scheduled thread pool.
*/
int getScheduledThreadPoolMaxSize();
/**
* Sets the maximum size of the scheduled thread pool.
- *
+ *
* This setting is relevant only if this factory does not use global pools.
* Value must be greater than 0.
- *
+ *
* @param scheduledThreadPoolMaxSize maximum size of the scheduled thread pool.
*/
void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize);
/**
* Returns the maximum size of the thread pool.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_THREAD_POOL_MAX_SIZE}.
- *
+ *
* @return the maximum size of the thread pool.
*/
int getThreadPoolMaxSize();
/**
* Sets the maximum size of the thread pool.
- *
+ *
* This setting is relevant only if this factory does not use global pools.
* Value must be -1 (for unlimited thread pool) or greater than 0.
- *
+ *
* @param threadPoolMaxSize maximum size of the thread pool.
*/
void setThreadPoolMaxSize(int threadPoolMaxSize);
/**
- * Returns the time to retry connections created by this factory after failure.
- *
+ * Returns the time to retry connections created by this factory after failure.
+ *
* Value is in milliseconds, default is {@link HornetQClient#DEFAULT_RETRY_INTERVAL}.
- *
+ *
* @return the time to retry connections created by this factory after failure
*/
long getRetryInterval();
/**
* Sets the time to retry connections created by this factory after failure.
- *
+ *
* Value must be greater than 0.
- *
- * @param retryInterval time (in milliseconds) to retry connections created by this factory after failure
+ *
+ * @param retryInterval time (in milliseconds) to retry connections created by this factory after failure
*/
void setRetryInterval(long retryInterval);
/**
* Returns the multiplier to apply to successive retry intervals.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_RETRY_INTERVAL_MULTIPLIER}.
- *
+ *
* @return the multiplier to apply to successive retry intervals
*/
double getRetryIntervalMultiplier();
/**
* Sets the multiplier to apply to successive retry intervals.
- *
+ *
* Value must be positive.
- *
+ *
* @param retryIntervalMultiplier multiplier to apply to successive retry intervals
*/
void setRetryIntervalMultiplier(double retryIntervalMultiplier);
/**
* Returns the maximum retry interval (in the case a retry interval multiplier has been specified).
- *
+ *
* Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_MAX_RETRY_INTERVAL}.
- *
+ *
* @return the maximum retry interval
*/
long getMaxRetryInterval();
/**
* Sets the maximum retry interval.
- *
+ *
* Value must be greater than 0.
- *
+ *
* @param maxRetryInterval maximum retry interval to apply in the case a retry interval multiplier has been specified
*/
void setMaxRetryInterval(long maxRetryInterval);
/**
* Returns the maximum number of attempts to retry connection in case of failure.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_RECONNECT_ATTEMPTS}.
- *
+ *
* @return the maximum number of attempts to retry connection in case of failure.
*/
int getReconnectAttempts();
/**
* Sets the maximum number of attempts to retry connection in case of failure.
- *
+ *
* Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater than 0.
- *
+ *
* @param reconnectAttempts maximum number of attempts to retry connection in case of failure
*/
void setReconnectAttempts(int reconnectAttempts);
@@ -523,66 +522,66 @@
/**
* Returns true if the client will automatically attempt to connect to the backup server if the initial
* connection to the live server fails
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_FAILOVER_ON_INITIAL_CONNECTION}.
*/
boolean isFailoverOnInitialConnection();
-
+
/**
* Sets the value for FailoverOnInitialReconnection
- *
+ *
* @param failover
*/
void setFailoverOnInitialConnection(boolean failover);
/**
* Returns the class name of the connection load balancing policy.
- *
+ *
* Default value is "org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy".
- *
+ *
* @return the class name of the connection load balancing policy
*/
String getConnectionLoadBalancingPolicyClassName();
/**
* Sets the class name of the connection load balancing policy.
- *
+ *
* Value must be the name of a class implementing {@link ConnectionLoadBalancingPolicy}.
- *
+ *
* @param loadBalancingPolicyClassName class name of the connection load balancing policy
*/
void setConnectionLoadBalancingPolicyClassName(String loadBalancingPolicyClassName);
/**
* Returns the initial size of messages created through this factory.
- *
+ *
* Value is in bytes, default value is {@link HornetQClient#DEFAULT_INITIAL_MESSAGE_PACKET_SIZE}.
- *
+ *
* @return the initial size of messages created through this factory
*/
int getInitialMessagePacketSize();
/**
* Sets the initial size of messages created through this factory.
- *
+ *
* Value must be greater than 0.
- *
+ *
* @param size initial size of messages created through this factory.
*/
void setInitialMessagePacketSize(int size);
-
+
/**
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
- *
+ *
* @param interceptor an Interceptor
*/
void addInterceptor(Interceptor interceptor);
/**
* Removes an interceptor.
- *
+ *
* @param interceptor interceptor to remove
- *
+ *
* @return <code>true</code> if the interceptor is removed from this factory, <code>false</code> else
*/
boolean removeInterceptor(Interceptor interceptor);
@@ -593,9 +592,9 @@
void close();
boolean isHA();
-
+
boolean isCompressLargeMessage();
-
+
void setCompressLargeMessage(boolean compress);
void addClusterTopologyListener(ClusterTopologyListener listener);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java 2011-07-04 10:46:18 UTC (rev 10914)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java 2011-07-05 14:00:12 UTC (rev 10915)
@@ -13,12 +13,13 @@
package org.hornetq.api.core.management;
+import org.hornetq.core.server.cluster.Bridge;
+
/**
* A BridgeControl is used to manage a Bridge.
- *
+ * @see Bridge
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
*/
public interface BridgeControl extends HornetQComponentControl
{
@@ -76,9 +77,9 @@
* Returns whether this bridge is using duplicate detection.
*/
boolean isUseDuplicateDetection();
-
+
/**
- * Returns whether this bridge is using high availability
+ * Returns whether this bridge is using high availability
*/
boolean isHA();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java 2011-07-04 10:46:18 UTC (rev 10914)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java 2011-07-05 14:00:12 UTC (rev 10915)
@@ -13,9 +13,10 @@
package org.hornetq.api.core.management;
+import org.hornetq.core.server.Divert;
+
/**
- * A DivertControl is used to manage a divert.
- *
+ * A DivertControl is used to manage a {@link Divert}.
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
public interface DivertControl
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java 2011-07-04 10:46:18 UTC (rev 10914)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java 2011-07-05 14:00:12 UTC (rev 10915)
@@ -14,8 +14,7 @@
package org.hornetq.api.core.management;
/**
- * Types of notification emmitted by HornetQ servers.
- *
+ * Types of notification emitted by HornetQ servers.
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
public enum NotificationType
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java 2011-07-04 10:46:18 UTC (rev 10914)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java 2011-07-05 14:00:12 UTC (rev 10915)
@@ -23,7 +23,7 @@
/**
* Info for a MBean Operation.
- * <b>
+ * <p>
* This annotation is used only for methods which can be invoked
* through a GUI.
*
13 years, 5 months
JBoss hornetq SVN: r10914 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-04 06:46:18 -0400 (Mon, 04 Jul 2011)
New Revision: 10914
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Pass the executor to the replication manager.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-01 18:19:25 UTC (rev 10913)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-04 10:46:18 UTC (rev 10914)
@@ -317,41 +317,41 @@
private synchronized void initialise() throws Exception
{
- if (!readOnly)
+ if (readOnly)
{
- setThreadPools();
+ return;
+ }
- instantiateLoadBalancingPolicy();
+ setThreadPools();
+ instantiateLoadBalancingPolicy();
- if (discoveryGroupConfiguration != null)
+ if (discoveryGroupConfiguration != null)
+ {
+ InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
+ InetAddress lbAddress;
+
+ if (discoveryGroupConfiguration.getLocalBindAddress() != null)
{
- InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
+ lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
+ }
+ else
+ {
+ lbAddress = null;
+ }
- InetAddress lbAddress;
+ discoveryGroup =
+ new DiscoveryGroupImpl(nodeID,
+ discoveryGroupConfiguration.getName(),
+ lbAddress,
+ groupAddress,
+ discoveryGroupConfiguration.getGroupPort(),
+ discoveryGroupConfiguration.getRefreshTimeout());
- if (discoveryGroupConfiguration.getLocalBindAddress() != null)
- {
- lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
- }
- else
- {
- lbAddress = null;
- }
+ discoveryGroup.registerListener(this);
+ discoveryGroup.start();
+ }
- discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
- discoveryGroupConfiguration.getGroupPort(),
- discoveryGroupConfiguration.getRefreshTimeout());
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
-
- readOnly = true;
- }
+ readOnly = true;
}
private ServerLocatorImpl(final boolean useHA,
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01 18:19:25 UTC (rev 10913)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-04 10:46:18 UTC (rev 10914)
@@ -79,7 +79,7 @@
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
- private ExecutorFactory executorFactory;
+ private final ExecutorFactory executorFactory;
private SessionFailureListener failureListener;
@@ -89,7 +89,10 @@
// Constructors --------------------------------------------------
- public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory, final ExecutorFactory executorFactory)
+ // XXX remove constructor once the other one is stable
+ @Deprecated
+ public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory,
+ final ExecutorFactory executorFactory)
{
super();
this.executorFactory = executorFactory;
@@ -101,8 +104,9 @@
/**
* @param remotingConnection
*/
- public ReplicationManagerImpl(CoreRemotingConnection remotingConnection)
+ public ReplicationManagerImpl(CoreRemotingConnection remotingConnection, final ExecutorFactory executorFactory)
{
+ this.executorFactory = executorFactory;
replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01 18:19:25 UTC (rev 10913)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-04 10:46:18 UTC (rev 10914)
@@ -1954,11 +1954,10 @@
System.out.println(HornetQServerImpl.class.getName() + " " + this.getIdentity() +
": create a ReplicationManagerImpl");
- replicationManager = new ReplicationManagerImpl(rc);
- System.out.println("rep.start()");
+ replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
- System.out.println("add RepMan to JournalStorageManager...");
journalStorageManager.setReplicator(replicationManager);
+ System.out.println("HornetQServerImpl: ReplicationManagerImpl is started & added to JournalStorageManager...");
}
}
13 years, 5 months
JBoss hornetq SVN: r10913 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-01 14:19:25 -0400 (Fri, 01 Jul 2011)
New Revision: 10913
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-01 16:00:44 UTC (rev 10912)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-01 18:19:25 UTC (rev 10913)
@@ -1748,13 +1748,15 @@
// TODO - this involves a scan - we should find a quicker way of doing it
MessageReference removed = queue.removeReferenceWithID(messageID);
-
- referencesToAck.add(removed);
-
+
if (removed == null)
{
log.warn("Failed to remove reference for " + messageID);
}
+ else
+ {
+ referencesToAck.add(removed);
+ }
break;
}
@@ -1888,6 +1890,7 @@
if (removed != null)
{
+ log.info("PUTZ Adding referencesToACK: " + removed);
referencesToAck.add(removed);
}
@@ -1895,6 +1898,8 @@
for (MessageReference ack : referencesToAck)
{
+ log.info("PUTZ Ack = " + ack);
+ log.info("PUTZ ACK.getQueue() = " + ack.getQueue());
ack.getQueue().reacknowledge(tx, ack);
}
13 years, 5 months
JBoss hornetq SVN: r10912 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 12:00:44 -0400 (Fri, 01 Jul 2011)
New Revision: 10912
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Don't use a blocking send, as the backup node is already up.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01 16:00:03 UTC (rev 10911)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01 16:00:44 UTC (rev 10912)
@@ -316,7 +316,7 @@
Channel systemChannel = remotingConnection.getChannel(CHANNEL_ID.SESSION.id, -1);
- systemChannel.sendBlocking(replicationStartPackage);
+ systemChannel.send(replicationStartPackage);
failureListener = new SessionFailureListener()
{
13 years, 5 months
JBoss hornetq SVN: r10911 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq: core/replication/impl and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 12:00:03 -0400 (Fri, 01 Jul 2011)
New Revision: 10911
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
Log:
HORNETQ-720 pass CoreRemotingConnection instead of channels, as it
allows us to use the failureListener.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01 16:00:03 UTC (rev 10911)
@@ -158,14 +158,10 @@
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
- long channelID = msg.getChannelID();
- Channel channelX = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
- Channel replicationChannel = rc.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- System.out.println("msg channelID: " + channelID);
System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
try
{
- server.addHaBackup(channelX, replicationChannel);
+ server.addHaBackup(rc);
}
catch (Exception e)
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01 16:00:03 UTC (rev 10911)
@@ -25,12 +25,10 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
-import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -115,25 +113,6 @@
break;
}
- case PacketImpl.HA_BACKUP_REGISTRATION:
- {
- HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
- System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
- long channelID = msg.getChannelID();
- Channel channelX = connection.getChannel(CHANNEL_ID.SESSION.id, -1);
- Channel replicationChannel = connection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
- try
- {
- server.addHaBackup(channelX, replicationChannel);
- }
- catch (Exception e)
- {
- // XXX This is not what we want
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
default:
{
HornetQPacketHandler.log.error("Invalid packet " + packet);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01 16:00:03 UTC (rev 10911)
@@ -83,7 +83,7 @@
private SessionFailureListener failureListener;
- private final Channel systemChannel;
+ private CoreRemotingConnection remotingConnection;
// Static --------------------------------------------------------
@@ -95,21 +95,16 @@
this.executorFactory = executorFactory;
CoreRemotingConnection conn = sessionFactory.getConnection();
- systemChannel = conn.getChannel(CHANNEL_ID.SESSION.id, -1);
replicatingChannel = conn.getChannel(CHANNEL_ID.REPLICATION.id, -1);
}
- // Public --------------------------------------------------------
-
/**
- * @param systemChannel
- * @param replicatingChannel
+ * @param remotingConnection
*/
- public ReplicationManagerImpl(Channel systemChannel, Channel replicatingChannel)
+ public ReplicationManagerImpl(CoreRemotingConnection remotingConnection)
{
- super();
- this.systemChannel = systemChannel;
- this.replicatingChannel = replicatingChannel;
+ replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ this.remotingConnection = remotingConnection;
}
public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
@@ -314,20 +309,13 @@
throw new IllegalStateException("ReplicationManager is already started");
}
-// replicatingConnection = sessionFactory.getConnection();
-//
-// if (replicatingConnection == null)
-// {
-// ReplicationManagerImpl.log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
-// throw new HornetQException(HornetQException.ILLEGAL_STATE,
-// "Backup server MUST be started before live server. Initialisation will not proceed.");
-// }
-
replicatingChannel.setHandler(responseHandler);
CreateReplicationSessionMessage replicationStartPackage =
new CreateReplicationSessionMessage(replicatingChannel.getID());
+ Channel systemChannel = remotingConnection.getChannel(CHANNEL_ID.SESSION.id, -1);
+
systemChannel.sendBlocking(replicationStartPackage);
failureListener = new SessionFailureListener()
@@ -358,10 +346,9 @@
{
}
};
- // sessionFactory.addFailureListener(failureListener);
+ remotingConnection.addFailureListener(failureListener);
started = true;
-
enabled = true;
}
@@ -397,15 +384,14 @@
replicatingChannel.close();
}
+ remotingConnection.removeFailureListener(failureListener);
// sessionFactory.causeExit();
-// sessionFactory.removeFailureListener(failureListener);
// if (replicatingConnection != null)
// {
// replicatingConnection.destroy();
// }
//
-// replicatingConnection = null;
-
+ remotingConnection = null;
started = false;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01 16:00:03 UTC (rev 10911)
@@ -28,6 +28,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
@@ -176,5 +177,5 @@
void stop(boolean failoverOnServerShutdown) throws Exception;
- void addHaBackup(Channel channelX, Channel replicationChannel) throws Exception;
+ void addHaBackup(CoreRemotingConnection rc) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01 16:00:03 UTC (rev 10911)
@@ -81,6 +81,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.remoting.server.RemotingService;
@@ -547,7 +548,7 @@
}
log.info("announce backup to live-server (id=" + liveConnectorName + ")");
liveServerSessionFactory.getConnection()
- .getChannel(CHANNEL_ID.SESSION.id, -1)
+ .getChannel(CHANNEL_ID.PING.id, -1)
.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
log.info("backup registered");
@@ -1945,19 +1946,15 @@
}
@Override
- public void addHaBackup(Channel systemChannel, Channel replicatingChannel) throws Exception
+ public void addHaBackup(CoreRemotingConnection rc) throws Exception
{
if (!(storageManager instanceof JournalStorageManager))
return;
JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
System.out.println(HornetQServerImpl.class.getName() + " " + this.getIdentity() +
- ": create a ReplicationManagerImpl. Using ChannelID=" + systemChannel);
- // XXX not sure this is the right call to use
-// final ServerLocatorInternal serverLocator =
-// (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(connector);
-// ClientSessionFactoryInternal sessionFactory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
- replicationManager = new ReplicationManagerImpl(systemChannel, replicatingChannel);
+ ": create a ReplicationManagerImpl");
+ replicationManager = new ReplicationManagerImpl(rc);
System.out.println("rep.start()");
replicationManager.start();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java 2011-07-01 13:31:52 UTC (rev 10910)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java 2011-07-01 16:00:03 UTC (rev 10911)
@@ -43,17 +43,15 @@
long getCreationTime();
/**
- * returns a string representation of the remote address of this connection
- *
+ * Returns a string representation of the remote address of this connection.
* @return the remote address
*/
String getRemoteAddress();
/**
- * add a failure listener.
+ * Adds a failure listener.
* <p/>
* The listener will be called in the event of connection failure.
- *
* @param listener the listener
*/
void addFailureListener(FailureListener listener);
@@ -82,27 +80,27 @@
* @return true if removed
*/
boolean removeCloseListener(CloseListener listener);
-
+
List<CloseListener> removeCloseListeners();
-
+
void setCloseListeners(List<CloseListener> listeners);
-
-
+
+
/**
* return all the failure listeners
*
* @return the listeners
*/
List<FailureListener> getFailureListeners();
-
+
List<FailureListener> removeFailureListeners();
/**
- * set the failure listeners.
+ * Sets the failure listeners.
* <p/>
- * These will be called in the event of the connection being closed. Any previosuly added listeners will be removed.
- *
+ * These will be called in the event of the connection being closed. Any previously added
+ * listeners will be removed.
* @param listeners the listeners to add.
*/
void setFailureListeners(List<FailureListener> listeners);
@@ -146,20 +144,20 @@
*
* @return true if destroyed, otherwise false
*/
- boolean isDestroyed();
-
+ boolean isDestroyed();
+
/**
* Disconnect the connection, closing all channels
*/
void disconnect();
-
+
/**
* returns true if any data has been received since the last time this method was called.
*
* @return true if data has been received.
*/
boolean checkDataReceived();
-
+
/**
* flush all outstanding data from the connection.
*/
13 years, 5 months
JBoss hornetq SVN: r10910 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 5 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 09:31:52 -0400 (Fri, 01 Jul 2011)
New Revision: 10910
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Still with non-functional replication
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -53,7 +53,6 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.JournalReaderCallback;
@@ -98,13 +97,12 @@
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUID;
import org.hornetq.utils.XidCodecSupport;
/**
- *
+ *
* A JournalStorageManager
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
@@ -155,15 +153,13 @@
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
- private UUID persistentID;
-
private final BatchingIDGenerator idGenerator;
- private final ReplicationManager replicator;
+ private ReplicationManager replicator;
- private final Journal messageJournal;
+ private Journal messageJournal;
- private final Journal bindingsJournal;
+ private Journal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
@@ -330,6 +326,17 @@
return replicator != null;
}
+ public void setReplicator(ReplicationManager replicationManager)
+ {
+ assert replicationManager != null;
+ replicator = replicationManager;
+ Journal localMessageJournal = messageJournal;
+ Journal localBindingsJournal = bindingsJournal;
+ bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+ // XXX HORNETQ-720 obviously missing here is the synchronization step.
+ }
+
public void waitOnOperations() throws Exception
{
if (!started)
@@ -340,9 +347,6 @@
waitOnOperations(0);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
- */
public boolean waitOnOperations(final long timeout) throws Exception
{
if (!started)
@@ -756,11 +760,11 @@
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
-
+
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.UPDATE_DELIVERY_COUNT,
updateInfo,
-
+
syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -1137,19 +1141,19 @@
continue;
}
-
+
// Redistribution could install a Redistributor while we are still loading records, what will be an issue with prepared ACKs
// We make sure te Queue is paused before we reroute values.
queue.pause();
Collection<AddMessageRecord> valueRecords = queueRecords.values();
-
+
long currentTime = System.currentTimeMillis();
for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
-
+
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime)
{
scheduledDeliveryTime = 0;
@@ -1224,7 +1228,7 @@
{
messageJournal.perfBlast(perfBlastPages);
}
-
+
for (Queue queue : queues.values())
{
queue.resume();
@@ -1442,8 +1446,8 @@
return bindingsInfo;
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#lineUpContext()
*/
@@ -2990,6 +2994,7 @@
this.refEncoding = refEncoding;
}
+ @Override
public String toString()
{
return "AddRef;" + refEncoding;
@@ -3006,6 +3011,7 @@
this.refEncoding = refEncoding;
}
+ @Override
public String toString()
{
return "ACK;" + refEncoding;
@@ -3022,6 +3028,7 @@
Message msg;
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -29,6 +29,7 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
@@ -75,7 +76,7 @@
: null,
server.getNodeID());
- Channel channel1 = rc.getChannel(1, -1);
+ Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
ChannelHandler handler = new HornetQPacketHandler(this, server, channel1, rc);
@@ -90,7 +91,7 @@
final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
- final Channel channel0 = rc.getChannel(0, -1);
+ final Channel channel0 = rc.getChannel(CHANNEL_ID.PING.id, -1);
channel0.setHandler(new ChannelHandler()
{
@@ -157,10 +158,14 @@
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
+ long channelID = msg.getChannelID();
+ Channel channelX = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
+ Channel replicationChannel = rc.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ System.out.println("msg channelID: " + channelID);
System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
try
{
- server.addHaBackup(msg.getConnector());
+ server.addHaBackup(channelX, replicationChannel);
}
catch (Exception e)
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -25,10 +25,12 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -113,6 +115,25 @@
break;
}
+ case PacketImpl.HA_BACKUP_REGISTRATION:
+ {
+ HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+ System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
+ long channelID = msg.getChannelID();
+ Channel channelX = connection.getChannel(CHANNEL_ID.SESSION.id, -1);
+ Channel replicationChannel = connection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
+ try
+ {
+ server.addHaBackup(channelX, replicationChannel);
+ }
+ catch (Exception e)
+ {
+ // XXX This is not what we want
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
default:
{
HornetQPacketHandler.log.error("Invalid packet " + packet);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -21,12 +21,13 @@
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.server.HornetQComponent;
/**
+ * Used by the {@link JournalStorageManager} to update the replicated journal.
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public interface ReplicationManager extends HornetQComponent
{
@@ -79,7 +80,7 @@
/**
* @param journalInfo
- * @throws HornetQException
+ * @throws HornetQException
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -30,7 +30,7 @@
import org.hornetq.core.replication.ReplicationManager;
/**
- * Used by the {@link JournalStorageManager} to replicate journal calls.
+ * Used by the {@link JournalStorageManager} to replicate journal calls.
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -59,10 +59,11 @@
private final byte journalID;
- public ReplicatedJournal(final byte journaID, final Journal localJournal, final ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journalID, final Journal localJournal,
+ final ReplicationManager replicationManager)
{
super();
- journalID = journaID;
+ this.journalID = journalID;
this.localJournal = localJournal;
this.replicationManager = replicationManager;
}
@@ -201,10 +202,10 @@
}
replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
-
+
}
-
+
/**
* @param id
* @param sync
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -33,6 +33,7 @@
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
@@ -52,7 +53,7 @@
/**
* A ReplicationManagerImpl
- *
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public class ReplicationManagerImpl implements ReplicationManager
@@ -65,12 +66,11 @@
private final ResponseHandler responseHandler = new ResponseHandler();
- private final ClientSessionFactoryInternal sessionFactory;
+// private final ClientSessionFactoryInternal sessionFactory;
+// private CoreRemotingConnection replicatingConnection;
- private CoreRemotingConnection replicatingConnection;
+ private final Channel replicatingChannel;
- private Channel replicatingChannel;
-
private boolean started;
private volatile boolean enabled;
@@ -79,10 +79,12 @@
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
- private final ExecutorFactory executorFactory;
+ private ExecutorFactory executorFactory;
private SessionFailureListener failureListener;
+ private final Channel systemChannel;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -90,15 +92,25 @@
public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory, final ExecutorFactory executorFactory)
{
super();
- this.sessionFactory = sessionFactory;
this.executorFactory = executorFactory;
+
+ CoreRemotingConnection conn = sessionFactory.getConnection();
+ systemChannel = conn.getChannel(CHANNEL_ID.SESSION.id, -1);
+ replicatingChannel = conn.getChannel(CHANNEL_ID.REPLICATION.id, -1);
}
// Public --------------------------------------------------------
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
+ /**
+ * @param systemChannel
+ * @param replicatingChannel
*/
+ public ReplicationManagerImpl(Channel systemChannel, Channel replicatingChannel)
+ {
+ super();
+ this.systemChannel = systemChannel;
+ this.replicatingChannel = replicatingChannel;
+ }
public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
{
@@ -302,26 +314,21 @@
throw new IllegalStateException("ReplicationManager is already started");
}
- replicatingConnection = sessionFactory.getConnection();
+// replicatingConnection = sessionFactory.getConnection();
+//
+// if (replicatingConnection == null)
+// {
+// ReplicationManagerImpl.log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
+// throw new HornetQException(HornetQException.ILLEGAL_STATE,
+// "Backup server MUST be started before live server. Initialisation will not proceed.");
+// }
- if (replicatingConnection == null)
- {
- ReplicationManagerImpl.log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Backup server MUST be started before live server. Initialisation will not proceed.");
- }
-
- long channelID = replicatingConnection.generateChannelID();
-
- Channel mainChannel = replicatingConnection.getChannel(1, -1);
-
- replicatingChannel = replicatingConnection.getChannel(channelID, -1);
-
replicatingChannel.setHandler(responseHandler);
- CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID);
+ CreateReplicationSessionMessage replicationStartPackage =
+ new CreateReplicationSessionMessage(replicatingChannel.getID());
- mainChannel.sendBlocking(replicationStartPackage);
+ systemChannel.sendBlocking(replicationStartPackage);
failureListener = new SessionFailureListener()
{
@@ -351,7 +358,7 @@
{
}
};
- sessionFactory.addFailureListener(failureListener);
+ // sessionFactory.addFailureListener(failureListener);
started = true;
@@ -390,15 +397,15 @@
replicatingChannel.close();
}
- sessionFactory.causeExit();
- sessionFactory.removeFailureListener(failureListener);
- if (replicatingConnection != null)
- {
- replicatingConnection.destroy();
- }
+// sessionFactory.causeExit();
+// sessionFactory.removeFailureListener(failureListener);
+// if (replicatingConnection != null)
+// {
+// replicatingConnection.destroy();
+// }
+//
+// replicatingConnection = null;
- replicatingConnection = null;
-
started = false;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -20,7 +20,6 @@
import javax.management.MBeanServer;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
@@ -177,9 +176,5 @@
void stop(boolean failoverOnServerShutdown) throws Exception;
- /**
- * @param connector
- * @throws Exception
- */
- void addHaBackup(TransportConfiguration connector) throws Exception;
+ void addHaBackup(Channel channelX, Channel replicationChannel) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -44,7 +44,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
@@ -82,6 +81,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
@@ -123,7 +123,6 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -214,8 +213,6 @@
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
- private final Set<String> sharedNothingBackups = new ConcurrentHashSet<String>();
-
private final Object initialiseLock = new Object();
private boolean initialised;
@@ -517,7 +514,7 @@
}
}
- private class SharedNothingBackupActivation implements Activation
+ private final class SharedNothingBackupActivation implements Activation
{
public void run()
{
@@ -550,11 +547,10 @@
}
log.info("announce backup to live-server (id=" + liveConnectorName + ")");
liveServerSessionFactory.getConnection()
- .getChannel(0, -1)
+ .getChannel(CHANNEL_ID.SESSION.id, -1)
.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
log.info("backup registered");
-
started = true;
log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
@@ -639,7 +635,8 @@
}
else
{
- // Replicated
+ assert replicationEndpoint == null;
+ replicationEndpoint = new ReplicationEndpointImpl(this);
activation = new SharedNothingBackupActivation();
}
@@ -993,14 +990,12 @@
public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
{
- if (configuration.isBackup())
+ if (!configuration.isBackup())
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is a backup server");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server " +
+ getIdentity());
}
- assert replicationEndpoint == null;
- replicationEndpoint = new ReplicationEndpointImpl(this);
-
if (replicationEndpoint == null)
System.err.println("endpoint is null!");
@@ -1262,23 +1257,23 @@
// Private
// --------------------------------------------------------------------------------------
- private boolean startReplication(TransportConfiguration connector) throws Exception
- {
- assert !configuration.isSharedStore();
- if (configuration.isSharedStore() || connector == null)
- {
- return true;
- }
+// private boolean startReplication(TransportConfiguration connector) throws Exception
+// {
+// assert !configuration.isSharedStore();
+// if (configuration.isSharedStore() || connector == null)
+// {
+// return true;
+// }
+//
+// serverLocator = HornetQClient.createServerLocatorWithHA(connector);
+// ClientSessionFactoryInternal replicationFailoverManager =
+// (ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
+// replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
+// replicationManager.start();
+//
+// return true;
+// }
- serverLocator = HornetQClient.createServerLocatorWithHA(connector);
- ClientSessionFactoryInternal replicationFailoverManager =
- (ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
- replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
- replicationManager.start();
-
- return true;
- }
-
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1950,11 +1945,23 @@
}
@Override
- public void addHaBackup(TransportConfiguration connector) throws Exception
+ public void addHaBackup(Channel systemChannel, Channel replicatingChannel) throws Exception
{
- log.info(connector + " " + connector.getFactoryClassName() + " " + connector.getParams() + " " +
- replicationManager);
- startReplication(connector);
- // throw new UnsupportedOperationException("unimplemented");
+ if (!(storageManager instanceof JournalStorageManager))
+ return;
+ JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
+
+ System.out.println(HornetQServerImpl.class.getName() + " " + this.getIdentity() +
+ ": create a ReplicationManagerImpl. Using ChannelID=" + systemChannel);
+ // XXX not sure this is the right call to use
+// final ServerLocatorInternal serverLocator =
+// (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(connector);
+// ClientSessionFactoryInternal sessionFactory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
+ replicationManager = new ReplicationManagerImpl(systemChannel, replicatingChannel);
+ System.out.println("rep.start()");
+ replicationManager.start();
+
+ System.out.println("add RepMan to JournalStorageManager...");
+ journalStorageManager.setReplicator(replicationManager);
}
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-01 13:30:25 UTC (rev 10909)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-01 13:31:52 UTC (rev 10910)
@@ -169,8 +169,7 @@
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_backup");
config1.getAcceptorConfigurations().clear();
config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- TransportConfiguration tc = getConnectorTransportConfiguration(true);
- config1.getConnectorConfigurations().put(LIVE_NODE_NAME, tc);
+ config1.getConnectorConfigurations().put(LIVE_NODE_NAME, getConnectorTransportConfiguration(true));
//liveConfig.setBackupConnectorName("toBackup");
config1.setSecurityEnabled(false);
13 years, 5 months
JBoss hornetq SVN: r10909 - branches/HORNETQ-720_Replication/etc.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 09:30:25 -0400 (Fri, 01 Jul 2011)
New Revision: 10909
Modified:
branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
Log:
Adjust eclipse formatting.
Modified: branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
===================================================================
--- branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-07-01 13:30:09 UTC (rev 10908)
+++ branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-07-01 13:30:25 UTC (rev 10909)
@@ -1,4 +1,4 @@
-#Mon Jun 27 17:03:43 CEST 2011
+#Fri Jul 01 11:51:40 CEST 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.codeComplete.argumentPrefixes=
org.eclipse.jdt.core.codeComplete.argumentSuffixes=
@@ -46,7 +46,7 @@
org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=82
org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=1
+org.eclipse.jdt.core.formatter.blank_lines_before_field=0
org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
@@ -68,7 +68,7 @@
org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=next_line
org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=true
org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=true
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
+org.eclipse.jdt.core.formatter.comment.format_block_comments=true
org.eclipse.jdt.core.formatter.comment.format_header=false
org.eclipse.jdt.core.formatter.comment.format_html=true
org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=true
@@ -76,7 +76,7 @@
org.eclipse.jdt.core.formatter.comment.format_source_code=true
org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
+org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=do not insert
org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=do not insert
org.eclipse.jdt.core.formatter.comment.line_length=100
org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries=true
13 years, 5 months
JBoss hornetq SVN: r10908 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 09:30:09 -0400 (Fri, 01 Jul 2011)
New Revision: 10908
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
Log:
clean up
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-01 12:39:30 UTC (rev 10907)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-01 13:30:09 UTC (rev 10908)
@@ -202,7 +202,7 @@
closeExecutor = orderedExecutorFactory.getExecutor();
this.interceptors = interceptors;
-
+
}
public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
@@ -377,7 +377,7 @@
sessions.remove(session);
}
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -418,7 +418,8 @@
return;
}
- // we need to stopthe factory from connecting if it is in the middle aof trying to failover before we get the lock
+ // we need to stop the factory from connecting if it is in the middle aof trying to failover
+ // before we get the lock
causeExit();
synchronized (createSessionLock)
{
@@ -461,7 +462,7 @@
{
stopPingingAfterOne = true;
}
-
+
public void resumePinging()
{
stopPingingAfterOne = false;
@@ -866,7 +867,7 @@
{
sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
}
-
+
for (ClientSessionInternal session : sessionsToFailover)
{
session.handleFailover(connection);
@@ -902,7 +903,7 @@
if (reconnectAttempts != 0)
{
count++;
-
+
if (reconnectAttempts != -1 && count == reconnectAttempts)
{
log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up on reconnecting it.");
@@ -1011,7 +1012,7 @@
{
log.debug("Trying to connect at the main server using connector :" + connectorConfig);
}
-
+
tc = connector.createConnection();
if (tc == null)
@@ -1020,7 +1021,7 @@
{
log.debug("Main server is not up. Hopefully there's a backup configured now!");
}
-
+
try
{
connector.close();
@@ -1058,7 +1059,7 @@
{
log.debug("Backup is not active yet");
}
-
+
try
{
connector.close();
@@ -1072,12 +1073,12 @@
else
{
/*looks like the backup is now live, lets use that*/
-
+
if (log.isDebugEnabled())
{
log.debug("Connected to the backup at " + backupConfig);
}
-
+
connectorConfig = backupConfig;
backupConfig = null;
@@ -1171,6 +1172,7 @@
return connection;
}
+ @Override
public void finalize() throws Throwable
{
if (!closed)
@@ -1262,7 +1264,7 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
-
+
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1374,7 +1376,7 @@
first = false;
long now = System.currentTimeMillis();
-
+
if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now >= lastCheck + connectionTTL )
{
if (!connection.checkDataReceived())
@@ -1405,7 +1407,7 @@
}
/**
- *
+ *
*/
public void send()
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01 12:39:30 UTC (rev 10907)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01 13:30:09 UTC (rev 10908)
@@ -40,7 +40,7 @@
/**
* A packet handler for all packets that need to be handled at the server level
- *
+ *
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
@@ -129,9 +129,9 @@
Version version = server.getVersion();
int[] compatibleList = version.getCompatibleVersionList();
boolean isCompatibleClient = false;
- for (int i = 0; i < compatibleList.length; i++)
+ for (int element : compatibleList)
{
- if (compatibleList[i] == request.getVersion())
+ if (element == request.getVersion())
{
isCompatibleClient = true;
break;
@@ -194,7 +194,7 @@
catch (HornetQException e)
{
log.error("Failed to create session ", e);
- response = new HornetQExceptionMessage((HornetQException)e);
+ response = new HornetQExceptionMessage(e);
if (e.getCode() == HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
{
@@ -251,7 +251,7 @@
// Even though session exists, we can't reattach since confi window size == -1,
// i.e. we don't have a resend cache for commands, so we just close the old session
// and let the client recreate
-
+
log.warn("Reattach request from " + connection.getRemoteAddress() + " failed as there is no confirmationWindowSize configured, which may be ok for your system");
sessionHandler.closeListeners();
13 years, 5 months
JBoss hornetq SVN: r10907 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/api/core and 4 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 08:39:30 -0400 (Fri, 01 Jul 2011)
New Revision: 10907
Removed:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java
Modified:
branches/HORNETQ-720_Replication/
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
merge from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-10900
+ /trunk:10878-10906
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java 2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java 2011-07-01 12:39:30 UTC (rev 10907)
@@ -53,27 +53,6 @@
private static final byte TYPE_STRING = 3;
/**
- * Utility method for splitting a comma separated list of hosts
- *
- * @param commaSeparatedHosts the comma separated host string
- * @return the hosts
- */
- public static String[] splitHosts(final String commaSeparatedHosts)
- {
- if (commaSeparatedHosts == null)
- {
- return new String[0];
- }
- String[] hosts = commaSeparatedHosts.split(",");
-
- for (int i = 0; i < hosts.length; i++)
- {
- hosts[i] = hosts[i].trim();
- }
- return hosts;
- }
-
- /**
* Creates a default TransportConfiguration with no configured transport.
*/
public TransportConfiguration()
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-07-01 12:39:30 UTC (rev 10907)
@@ -37,9 +37,33 @@
*/
public class ChannelImpl implements Channel
{
+ public enum CHANNEL_ID {
+
+ /**
+ * Used for core protocol management.
+ * @see CoreProtocolManager
+ */
+ PING(0),
+ /** Session creation and attachment. */
+ SESSION(1),
+ /** Replication, i.e. for backups that do not share the journal. */
+ REPLICATION(2),
+ /**
+ * Channels [0-9] are reserved for the system, user channels must be greater than that.
+ */
+ USER(10);
+
+ public final long id;
+
+ CHANNEL_ID(long id)
+ {
+ this.id = id;
+ }
+ }
+
private static final Logger log = Logger.getLogger(ChannelImpl.class);
- private volatile long id;
+ private final long id;
private ChannelHandler handler;
@@ -336,7 +360,7 @@
// And switch it
- final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
+ final CoreRemotingConnection rnewConnection = newConnection;
rnewConnection.putChannel(id, this);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-01 12:39:30 UTC (rev 10907)
@@ -85,7 +85,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -158,15 +157,17 @@
* A PacketDecoder
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
*/
-public class PacketDecoder
+public final class PacketDecoder
{
- private static final Logger log = Logger.getLogger(PacketDecoder.class);
- public Packet decode(final HornetQBuffer in)
+ private PacketDecoder()
{
+ // Utility
+ }
+
+ public static Packet decode(final HornetQBuffer in)
+ {
final byte packetType = in.readByte();
Packet packet;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01 12:39:30 UTC (rev 10907)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
@@ -71,11 +72,7 @@
private final boolean client;
- // Channels 0-9 are reserved for the system
- // 0 is for pinging
- // 1 is for session creation and attachment
- // 2 is for replication
- private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
+ private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
private boolean idGeneratorSynced = false;
@@ -83,18 +80,16 @@
private final Object failLock = new Object();
- private final PacketDecoder decoder = new PacketDecoder();
-
private volatile boolean dataReceived;
private final Executor executor;
-
+
private volatile boolean executing;
-
+
private final SimpleString nodeID;
private final long creationTime;
-
+
private String clientID;
// Constructors
@@ -139,9 +134,9 @@
this.client = client;
this.executor = executor;
-
+
this.nodeID = nodeID;
-
+
this.creationTime = System.currentTimeMillis();
}
@@ -174,7 +169,7 @@
{
return transportConnection.getRemoteAddress();
}
-
+
public long getCreationTime()
{
return creationTime;
@@ -247,26 +242,26 @@
public List<CloseListener> removeCloseListeners()
{
List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
-
+
closeListeners.clear();
-
+
return ret;
}
public List<FailureListener> removeFailureListeners()
{
List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
-
+
failureListeners.clear();
-
- return ret;
+
+ return ret;
}
public void setCloseListeners(List<CloseListener> listeners)
{
closeListeners.clear();
-
- closeListeners.addAll(listeners);
+
+ closeListeners.addAll(listeners);
}
public HornetQBuffer createBuffer(final int size)
@@ -323,7 +318,7 @@
callClosingListeners();
}
-
+
public void disconnect()
{
Channel channel0 = getChannel(0, -1);
@@ -331,13 +326,13 @@
// And we remove all channels from the connection, this ensures no more packets will be processed after this
// method is
// complete
-
+
Set<Channel> allChannels = new HashSet<Channel>(channels.values());
removeAllChannels();
// Now we are 100% sure that no more packets will be processed we can flush then send the disconnect
-
+
for (Channel channel: allChannels)
{
channel.flushConfirmations();
@@ -408,7 +403,7 @@
}
}
}
-
+
public void checkFlushBatchBuffer()
{
transportConnection.checkFlushBatchBuffer();
@@ -421,12 +416,12 @@
{
try
{
- final Packet packet = decoder.decode(buffer);
-
+ final Packet packet = PacketDecoder.decode(buffer);
+
if (packet.isAsyncExec() && executor != null)
{
executing = true;
-
+
executor.execute(new Runnable()
{
public void run()
@@ -439,7 +434,7 @@
{
RemotingConnectionImpl.log.error("Unexpected error", t);
}
-
+
executing = false;
}
});
@@ -451,13 +446,13 @@
{
Thread.yield();
}
-
+
// Pings must always be handled out of band so we can send pings back to the client quickly
// otherwise they would get in the queue with everything else which might give an intolerable delay
doBufferReceived(packet);
}
-
- dataReceived = true;
+
+ dataReceived = true;
}
catch (Exception e)
{
@@ -515,8 +510,8 @@
{
channels.clear();
}
- }
-
+ }
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
@@ -572,7 +567,7 @@
{
clientID = cID;
}
-
+
public String getClientID()
{
return clientID;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-07-01 12:39:30 UTC (rev 10907)
@@ -32,7 +32,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
@@ -467,7 +466,7 @@
private void startServerChannels()
{
- String[] hosts = TransportConfiguration.splitHosts(host);
+ String[] hosts = NettyAcceptor.splitHosts(host);
for (String h : hosts)
{
SocketAddress address;
@@ -613,6 +612,27 @@
// Inner classes -----------------------------------------------------------------------------
+ /**
+ * Utility method for splitting a comma separated list of hosts
+ *
+ * @param commaSeparatedHosts the comma separated host string
+ * @return the hosts
+ */
+ public static String[] splitHosts(final String commaSeparatedHosts)
+ {
+ if (commaSeparatedHosts == null)
+ {
+ return new String[0];
+ }
+ String[] hosts = commaSeparatedHosts.split(",");
+
+ for (int i = 0; i < hosts.length; i++)
+ {
+ hosts[i] = hosts[i].trim();
+ }
+ return hosts;
+ }
+
private final class HornetQServerChannelHandler extends HornetQChannelHandler
{
HornetQServerChannelHandler(final ChannelGroup group,
Deleted: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java 2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java 2011-07-01 12:39:30 UTC (rev 10907)
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.config.impl;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A TransportConfigurationTest
- *
- * @author jmesnil
- *
- * Created 20 janv. 2009 14:46:35
- *
- *
- */
-public class TransportConfigurationTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testSplitNullAddress() throws Exception
- {
- String[] addresses = TransportConfiguration.splitHosts(null);
-
- Assert.assertNotNull(addresses);
- Assert.assertEquals(0, addresses.length);
- }
-
- public void testSplitSingleAddress() throws Exception
- {
- String[] addresses = TransportConfiguration.splitHosts("localhost");
-
- Assert.assertNotNull(addresses);
- Assert.assertEquals(1, addresses.length);
- Assert.assertEquals("localhost", addresses[0]);
- }
-
- public void testSplitManyAddresses() throws Exception
- {
- String[] addresses = TransportConfiguration.splitHosts("localhost, 127.0.0.1, 192.168.0.10");
-
- Assert.assertNotNull(addresses);
- Assert.assertEquals(3, addresses.length);
- Assert.assertEquals("localhost", addresses[0]);
- Assert.assertEquals("127.0.0.1", addresses[1]);
- Assert.assertEquals("192.168.0.10", addresses[2]);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-07-01 12:39:30 UTC (rev 10907)
@@ -83,7 +83,7 @@
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
-
+
public void connectionReadyForWrites(Object connectionID, boolean ready)
{
}
@@ -108,12 +108,40 @@
acceptor.stop();
Assert.assertFalse(acceptor.isStarted());
UnitTestCase.checkFreePort(TransportConstants.DEFAULT_PORT);
-
+
pool1.shutdown();
pool2.shutdown();
-
+
pool1.awaitTermination(1, TimeUnit.SECONDS);
pool2.awaitTermination(1, TimeUnit.SECONDS);
}
+ public void testSplitNullAddress() throws Exception
+ {
+ String[] addresses = NettyAcceptor.splitHosts(null);
+
+ Assert.assertNotNull(addresses);
+ Assert.assertEquals(0, addresses.length);
+ }
+
+ public void testSplitSingleAddress() throws Exception
+ {
+ String[] addresses = NettyAcceptor.splitHosts("localhost");
+
+ Assert.assertNotNull(addresses);
+ Assert.assertEquals(1, addresses.length);
+ Assert.assertEquals("localhost", addresses[0]);
+ }
+
+ public void testSplitManyAddresses() throws Exception
+ {
+ String[] addresses = NettyAcceptor.splitHosts("localhost, 127.0.0.1, 192.168.0.10");
+
+ Assert.assertNotNull(addresses);
+ Assert.assertEquals(3, addresses.length);
+ Assert.assertEquals("localhost", addresses[0]);
+ Assert.assertEquals("127.0.0.1", addresses[1]);
+ Assert.assertEquals("192.168.0.10", addresses[2]);
+ }
+
}
13 years, 5 months