JBoss hornetq SVN: r8647 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 13:59:58 -0500 (Wed, 09 Dec 2009)
New Revision: 8647
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
tweak on doc
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 18:59:05 UTC (rev 8646)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 18:59:58 UTC (rev 8647)
@@ -256,9 +256,9 @@
timer.resumeSpin();
}
+
/**
- * force means the Journal is moving to a new file. Any pending write need to be done immediately
- * or data could be lost
+ * Note: Flush could be called by either the CheckTime, or by the Journal directly when moving to a new file
* */
public synchronized void flush()
{
15 years
JBoss hornetq SVN: r8646 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 13:59:05 -0500 (Wed, 09 Dec 2009)
New Revision: 8646
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
duh... removing extra synchronized block
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 18:30:41 UTC (rev 8645)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 18:59:05 UTC (rev 8646)
@@ -262,43 +262,40 @@
* */
public synchronized void flush()
{
- synchronized (this)
+ if (buffer.writerIndex() > 0)
{
- if (buffer.writerIndex() > 0)
+ int pos = buffer.writerIndex();
+
+ if (logRates)
{
- int pos = buffer.writerIndex();
+ bytesFlushed.addAndGet(pos);
+ }
- if (logRates)
- {
- bytesFlushed.addAndGet(pos);
- }
+ ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
- ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+ // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+ // Using bufferToFlush.put(buffer) would make several append calls for each byte
- // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
- // Using bufferToFlush.put(buffer) would make several append calls for each byte
+ bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
- bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
+ if (bufferToFlush != null)
+ {
+ bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+ }
- if (bufferToFlush != null)
- {
- bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
- }
+ lastFlushTime.set(System.nanoTime());
- lastFlushTime.set(System.nanoTime());
+ pendingSync = false;
- pendingSync = false;
+ callbacks = new LinkedList<IOAsyncTask>();
- callbacks = new LinkedList<IOAsyncTask>();
+ buffer.clear();
- buffer.clear();
+ bufferLimit = 0;
- bufferLimit = 0;
+ flushesDone.incrementAndGet();
- flushesDone.incrementAndGet();
-
- timer.pauseSpin();
- }
+ timer.pauseSpin();
}
}
15 years
JBoss hornetq SVN: r8645 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 13:30:41 -0500 (Wed, 09 Dec 2009)
New Revision: 8645
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
remove non necessary synchronized block
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 18:01:48 UTC (rev 8644)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 18:30:41 UTC (rev 8645)
@@ -382,35 +382,26 @@
// Needs to be called within synchronized blocks on TimedBuffer
public void resumeSpin()
{
- synchronized (TimedBuffer.this)
- {
- if (!spinning)
- {
- spinning = true;
- spinLimiter.release();
- }
- }
+ spinning = true;
+ spinLimiter.release();
}
// Needs to be called within synchronized blocks on TimedBuffer
public void pauseSpin()
{
- synchronized (TimedBuffer.this)
+ if (spinning)
{
- if (spinning)
+ spinning = false;
+ try
{
- spinning = false;
- try
+ if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
{
- if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
- {
- throw new IllegalStateException("Internal error on TimedBuffer. Can't stop spinning");
- }
+ throw new IllegalStateException("Internal error on TimedBuffer. Can't stop spinning");
}
- catch (InterruptedException ignored)
- {
- }
}
+ catch (InterruptedException ignored)
+ {
+ }
}
}
15 years
JBoss hornetq SVN: r8644 - in trunk/src/main/org/hornetq/core: remoting and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-09 13:01:48 -0500 (Wed, 09 Dec 2009)
New Revision: 8644
Modified:
trunk/src/main/org/hornetq/core/client/MessageHandler.java
trunk/src/main/org/hornetq/core/client/SessionFailureListener.java
trunk/src/main/org/hornetq/core/remoting/FailureListener.java
Log:
HORNETQ-186: fill in Javadocs for core API
Modified: trunk/src/main/org/hornetq/core/client/MessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/MessageHandler.java 2009-12-09 17:34:17 UTC (rev 8643)
+++ trunk/src/main/org/hornetq/core/client/MessageHandler.java 2009-12-09 18:01:48 UTC (rev 8644)
@@ -15,12 +15,21 @@
/**
*
- * A MessageHandler
+ * A MessageHandler is used to receive message <em>asynchronously</em>.
*
+ * To receive messages asynchronously, a MessageHandler is set on a ClientConsumer.
+ * Every time the consumer will receive a message, it will call the handler's {@code onMessage()}�method.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
+ * @see ClientConsumer#setMessageHandler(MessageHandler)
*/
public interface MessageHandler
{
+ /**
+ * Notifies the MessageHandler that a message has been received.
+ *
+ * @param message a message
+ */
void onMessage(ClientMessage message);
}
Modified: trunk/src/main/org/hornetq/core/client/SessionFailureListener.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/SessionFailureListener.java 2009-12-09 17:34:17 UTC (rev 8643)
+++ trunk/src/main/org/hornetq/core/client/SessionFailureListener.java 2009-12-09 18:01:48 UTC (rev 8644)
@@ -17,13 +17,18 @@
import org.hornetq.core.remoting.FailureListener;
/**
- * A SessionFailureListener
+ * A SessionFailureListener notifies the client when a failure occured on the session.
*
- * @author Tim Fox
- *
- *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public interface SessionFailureListener extends FailureListener
{
+ /**
+ * Notifies that a connection has failed due to the specified exception.
+ * <br>
+ * This method is called <em>before the session attempts to reconnect/failover</em>.
+ *
+ * @param exception exception which has caused the connection to fail
+ */
void beforeReconnect(HornetQException exception);
}
Modified: trunk/src/main/org/hornetq/core/remoting/FailureListener.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/FailureListener.java 2009-12-09 17:34:17 UTC (rev 8643)
+++ trunk/src/main/org/hornetq/core/remoting/FailureListener.java 2009-12-09 18:01:48 UTC (rev 8644)
@@ -17,7 +17,7 @@
/**
*
- * A FailureListener
+ * A FailureListener notifies the user when a connection failure occured.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -25,5 +25,10 @@
*/
public interface FailureListener
{
- void connectionFailed(HornetQException me);
+ /**
+ * Notifies that a connection has failed due to the specified exception.
+ *
+ * @param exception exception which has caused the connection to fail
+ */
+ void connectionFailed(HornetQException exception);
}
15 years
JBoss hornetq SVN: r8643 - in trunk: examples/jms/dead-letter and 22 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-09 12:34:17 -0500 (Wed, 09 Dec 2009)
New Revision: 8643
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/core-bridges.xml
trunk/docs/user-manual/en/examples.xml
trunk/docs/user-manual/en/message-expiry.xml
trunk/docs/user-manual/en/undelivered-messages.xml
trunk/docs/user-manual/en/using-core.xml
trunk/examples/jms/dead-letter/readme.html
trunk/examples/jms/dead-letter/src/org/hornetq/jms/example/DeadLetterExample.java
trunk/examples/jms/expiry/src/org/hornetq/jms/example/ExpiryExample.java
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/core/client/SendAcknowledgementHandler.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/message/BodyEncoder.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/PropertyConversionException.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/paging/PagingManager.java
trunk/src/main/org/hornetq/core/paging/PagingStoreFactory.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/security/Role.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
HORNETQ-185 + HORNETQ-186: API review + javadoc
* use consistently address instead of destination for Core Message API
* updated doc + examples
* added javadoc to Message interface
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-12-09 17:34:17 UTC (rev 8643)
@@ -529,7 +529,7 @@
<entry><link linkend="core-bridges"
>bridges.forwarding-address</link></entry>
<entry>String</entry>
- <entry>address to forward to. If omitted original destination is
+ <entry>address to forward to. If omitted original address is
used</entry>
<entry>null</entry>
</row>
Modified: trunk/docs/user-manual/en/core-bridges.xml
===================================================================
--- trunk/docs/user-manual/en/core-bridges.xml 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/docs/user-manual/en/core-bridges.xml 2009-12-09 17:34:17 UTC (rev 8643)
@@ -90,7 +90,7 @@
<listitem>
<para><literal>forwarding-address</literal>. This is the address on the target
server that the message will be forwarded to. If a forwarding address is not
- specified then the original destination of the message will be retained.</para>
+ specified, then the original address of the message will be retained.</para>
</listitem>
<listitem>
<para><literal>filter-string</literal>. An optional filter string can be supplied.
Modified: trunk/docs/user-manual/en/examples.xml
===================================================================
--- trunk/docs/user-manual/en/examples.xml 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/docs/user-manual/en/examples.xml 2009-12-09 17:34:17 UTC (rev 8643)
@@ -244,7 +244,7 @@
<para>The <literal>management-notification</literal> example shows how to receive
management notifications from HornetQ using JMS messages. HornetQ servers emit
management notifications when events of interest occur (consumers are created or
- closed, destinations are created or deleted, security authentication fails,
+ closed, addresses are created or deleted, security authentication fails,
etc.).</para>
</section>
<section id="examples.consumer-rate-limit">
@@ -268,9 +268,9 @@
period of time before being removed. JMS specification states that clients should
not receive messages that have been expired (but it does not guarantee this will not
happen).</para>
- <para>HornetQ can assign an expiry destination to a given queue so that when messages
- are expired, they are removed from the queue and sent to the expiry destination.
- These "expired" messages can later be consumed from the expiry destination for
+ <para>HornetQ can assign an expiry address to a given queue so that when messages
+ are expired, they are removed from the queue and sent to the expiry address.
+ These "expired" messages can later be consumed from the expiry address for
further inspection.</para>
</section>
<section id="examples.message-group">
Modified: trunk/docs/user-manual/en/message-expiry.xml
===================================================================
--- trunk/docs/user-manual/en/message-expiry.xml 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/docs/user-manual/en/message-expiry.xml 2009-12-09 17:34:17 UTC (rev 8643)
@@ -43,8 +43,8 @@
properties:</para>
<itemizedlist>
<listitem>
- <para><literal>_HQ_ORIG_DESTINATION</literal></para>
- <para>a String property containing the <emphasis>original destination</emphasis> of the
+ <para><literal>_HQ_ORIG_ADDRESS</literal></para>
+ <para>a String property containing the <emphasis>original address</emphasis> of the
expired message </para>
</listitem>
<listitem>
Modified: trunk/docs/user-manual/en/undelivered-messages.xml
===================================================================
--- trunk/docs/user-manual/en/undelivered-messages.xml 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/docs/user-manual/en/undelivered-messages.xml 2009-12-09 17:34:17 UTC (rev 8643)
@@ -106,8 +106,8 @@
property:</para>
<itemizedlist>
<listitem>
- <para><literal>_HQ_ORIG_DESTINATION</literal></para>
- <para>a String property containing the <emphasis>original destination</emphasis> of
+ <para><literal>_HQ_ORIG_ADDRESS</literal></para>
+ <para>a String property containing the <emphasis>original address</emphasis> of
the dead letter message </para>
</listitem>
</itemizedlist>
Modified: trunk/docs/user-manual/en/using-core.xml
===================================================================
--- trunk/docs/user-manual/en/using-core.xml 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/docs/user-manual/en/using-core.xml 2009-12-09 17:34:17 UTC (rev 8643)
@@ -36,8 +36,8 @@
properties which are key-value pairs. Each property key is a string and property
values can be of type integer, long, short, byte, byte[], String, double, float or
boolean.</para>
- <para>A message has a destination which represents the <emphasis>address</emphasis> it
- is being sent to. When the message arrives on the server it is routed to any queues
+ <para>A message has an <emphasis>address</emphasis> it is being sent to.
+ When the message arrives on the server it is routed to any queues
that are bound to the address. An address may have many queues bound to it or even
none. There may also be entities other than queues, like <emphasis role="italic"
>diverts</emphasis> bound to addresses.</para>
@@ -56,8 +56,8 @@
<title>Address</title>
<para>A server maintains a mapping between an address and a set of queues. Zero or more
queues can be bound to a single address. Each queue can be bound with an optional
- message filter. When a message is routed to an address it is routed to the set of
- queues bound to the message's destination address. If any of the queues are bound
+ message filter. When a message is routed, it is in fact routed to the set of
+ queues bound to the message's address. If any of the queues are bound
with a filter expression, then the message will only be routed to the subset of
bound queues which match that filter expression.</para>
<para>Other entities, such as <emphasis role="italic">diverts</emphasis> can also be
Modified: trunk/examples/jms/dead-letter/readme.html
===================================================================
--- trunk/examples/jms/dead-letter/readme.html 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/examples/jms/dead-letter/readme.html 2009-12-09 17:34:17 UTC (rev 8643)
@@ -13,15 +13,15 @@
Such a message goes back to the JMS destination ready to be redelivered.
However, this means it is possible for a message to be delivered again and again without any success and remain in the destination, clogging the system.</p>
<p>To prevent this, messaging systems define dead letter messages: after a specified unsuccessful delivery attempts, the message is removed from the destination
- and put instead in a <em>dead letter destination</em> where they can be consumed for further investigation.
+ and put instead in a <em>dead letter address</em> where they can be consumed for further investigation.
<p>
The example will show how to configure HornetQ to send a message to a dead letter destination after 3 unsuccessful delivery attempts.<br />
The example will send 1 message to a queue. We will deliver the message 3 times and rollback the session every time.<br />
- On the 4th attempt, there won't be any message to consume: it will have been moved to a <em>dead letter queue</em>.<br />
- We will then consume the message from this dead letter queue.
+ On the 4th attempt, there won't be any message to consume: it will have been moved to a <em>dead letter address</em>.<br />
+ We will then consume this dead letter message.
</p>
<h2>Example setup</h2>
- <p><em>Dead letter destinations</em> and <em>maximum delivery attempts</em> are defined in the configuration file <a href="server0/hornetq-configuration.xml">hornetq-configuration.xml</a>:</p>
+ <p><em>Dead letter addresses</em> and <em>maximum delivery attempts</em> are defined in the configuration file <a href="server0/hornetq-configuration.xml">hornetq-configuration.xml</a>:</p>
<pre class="prettyprint">
<code><address-setting match="jms.queue.exampleQueue">
<dead-letter-address>jms.queue.deadLetterQueue</dead-letter-address>
@@ -170,9 +170,9 @@
<code>System.out.println("Destination of the message: " + ((Queue)messageReceived.getJMSDestination()).getQueueName());</code>
</pre>
- <li>The <strong>origin destination</strong> is stored in the <code>_HORNETQ_ORIG_DESTINATION</code> property
+ <li>The <strong>origin destination</strong> is stored in the <code>_HQ_ORIG_ADDRESS</code> property
<pre class="prettyprint">
- <code>System.out.println("*Origin destination* of the message: " + messageReceived.getStringProperty("_HORNETQ_ORIG_DESTINATION"));</code>
+ <code>System.out.println("*Origin destination* of the message: " + messageReceived.getStringProperty("_HQ_ORIG_ADDRESS"));</code>
</pre>
<li>We do not forget to commit the session to acknowledge that we have received the message from the dead letter queue</li>
Modified: trunk/examples/jms/dead-letter/src/org/hornetq/jms/example/DeadLetterExample.java
===================================================================
--- trunk/examples/jms/dead-letter/src/org/hornetq/jms/example/DeadLetterExample.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/examples/jms/dead-letter/src/org/hornetq/jms/example/DeadLetterExample.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -126,8 +126,8 @@
// Step 21. the messageReceived's destination is now the dead letter queue.
System.out.println("Destination of the message: " + ((Queue)messageReceived.getJMSDestination()).getQueueName());
- // Step 22. the *origin* destination is stored in the _HORNETQ_ORIG_DESTINATION property
- System.out.println("*Origin destination* of the message: " + messageReceived.getStringProperty("_HORNETQ_ORIG_DESTINATION"));
+ // Step 22. the *origin* destination is stored in the _HQ_ORIG_ADDRESS property
+ System.out.println("*Origin destination* of the message: " + messageReceived.getStringProperty("_HQ_ORIG_ADDRESS"));
// Step 23. This time, we commit the session, the delivery from the dead letter queue is successful!
session.commit();
Modified: trunk/examples/jms/expiry/src/org/hornetq/jms/example/ExpiryExample.java
===================================================================
--- trunk/examples/jms/expiry/src/org/hornetq/jms/example/ExpiryExample.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/examples/jms/expiry/src/org/hornetq/jms/example/ExpiryExample.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -111,9 +111,9 @@
System.out.println("Expiration time of the expired message (relative to the expiry queue): " + messageReceived.getJMSExpiration());
System.out.println();
- // Step 20. the *origin* destination is stored in the _HQ_ORIG_DESTINATION property
- System.out.println("*Origin destination* of the expired message: " + messageReceived.getStringProperty("_HQ_ORIG_DESTINATION"));
- // Step 21. the actual expiration time is stored in the _HQ_ORIG_DESTINATION property
+ // Step 20. the *origin* destination is stored in the _HQ_ORIG_ADDRESS property
+ System.out.println("*Origin destination* of the expired message: " + messageReceived.getStringProperty("_HQ_ORIG_ADDRESS"));
+ // Step 21. the actual expiration time is stored in the _HQ_ACTUAL_EXPIRY property
System.out.println("*Actual expiration time* of the expired message: " + messageReceived.getLongProperty("_HQ_ACTUAL_EXPIRY"));
return true;
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -353,9 +353,6 @@
/**
* Returns whether consumers created through this factory will block while sending message acknowledgements or do it asynchronously.
*
- * If the consumer are configured to send message acknowledgement asynchronously, you can set a SendAcknowledgementHandler on the ClientSession
- * to be notified once the acknowledgement has been handled by the server.
- *
* Default value is {@value org.hornetq.core.client.impl.ClientSessionFactoryImpl#DEFAULT_BLOCK_ON_ACKNOWLEDGE}.
*
* @return whether consumers will block while sending message acknowledgements or do it asynchronously
@@ -371,6 +368,9 @@
/**
* Returns whether producers created through this factory will block while sending <em>durable</em> messages or do it asynchronously.
+ * <br>
+ * If the session is configured to send durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
+ * to be notified once the message has been handled by the server.
*
* Default value is {@value org.hornetq.core.client.impl.ClientSessionFactoryImpl#DEFAULT_BLOCK_ON_DURABLE_SEND}.
*
@@ -387,6 +387,9 @@
/**
* Returns whether producers created through this factory will block while sending <em>non-durable</em> messages or do it asynchronously.
+ * <br>
+ * If the session is configured to send non-durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
+ * to be notified once the message has been handled by the server.
*
* Default value is {@value org.hornetq.core.client.impl.ClientSessionFactoryImpl#DEFAULT_BLOCK_ON_NON_DURABLE_SEND}.
*
Modified: trunk/src/main/org/hornetq/core/client/SendAcknowledgementHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/SendAcknowledgementHandler.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/SendAcknowledgementHandler.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -16,15 +16,21 @@
import org.hornetq.core.message.Message;
/**
- * A SendAcknowledgementHandler
+ * A SendAcknowledgementHandler notifies a client when an message sent asynchronously has been received by the server.
+ * <br />
+ * If the session is not blocking when sending durable or non-durbale messages, the session can
+ * set a SendAcknowledgementHandler to ben notified later when the messages
+ * has been received by the server. The method {@code sendAcknowledged} will be called with the message that
+ * was sent asynchronously.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 9 Feb 2009 12:49:34
- *
- *
*/
public interface SendAcknowledgementHandler
{
+ /**
+ * Notifies the client that a message sent asynchronously has been received by the server.
+ *
+ * @param message message sent asynchronously
+ */
void sendAcknowledged(Message message);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -137,15 +137,11 @@
return "ClientMessage[messageID=" + messageID +
", durable=" +
durable +
- ", destination=" +
- getDestination() +
+ ", address=" +
+ getAddress() +
"]";
}
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getBodyEncoder()
- */
-
// FIXME - only used for large messages - move it!
/* (non-Javadoc)
* @see org.hornetq.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -24,9 +24,9 @@
*/
public interface ClientProducerCreditManager
{
- ClientProducerCredits getCredits(SimpleString destination);
+ ClientProducerCredits getCredits(SimpleString address);
- void receiveCredits(SimpleString destination, int credits, int offset);
+ void receiveCredits(SimpleString address, int credits, int offset);
void reset();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -40,24 +40,24 @@
this.windowSize = windowSize;
}
- public synchronized ClientProducerCredits getCredits(final SimpleString destination)
+ public synchronized ClientProducerCredits getCredits(final SimpleString address)
{
- ClientProducerCredits credits = producerCredits.get(destination);
+ ClientProducerCredits credits = producerCredits.get(address);
if (credits == null)
{
// Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, destination, windowSize);
+ credits = new ClientProducerCreditsImpl(session, address, windowSize);
- producerCredits.put(destination, credits);
+ producerCredits.put(address, credits);
}
return credits;
}
- public synchronized void receiveCredits(final SimpleString destination, final int credits, final int offset)
+ public synchronized void receiveCredits(final SimpleString address, final int credits, final int offset)
{
- ClientProducerCredits cr = producerCredits.get(destination);
+ ClientProducerCredits cr = producerCredits.get(address);
if (cr != null)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -33,19 +33,19 @@
private final int windowSize;
- private final SimpleString destination;
+ private final SimpleString address;
private final ClientSessionInternal session;
private int arriving;
public ClientProducerCreditsImpl(final ClientSessionInternal session,
- final SimpleString destination,
+ final SimpleString address,
final int windowSize)
{
this.session = session;
- this.destination = destination;
+ this.address = address;
this.windowSize = windowSize / 2;
@@ -120,7 +120,7 @@
private void requestCredits(final int credits)
{
- session.sendProducerCreditsMessage(credits, destination);
+ session.sendProducerCreditsMessage(credits, address);
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -205,14 +205,14 @@
if (address != null)
{
- msg.setDestination(address);
+ msg.setAddress(address);
// Anonymous
theCredits = session.getCredits(address);
}
else
{
- msg.setDestination(this.address);
+ msg.setAddress(this.address);
theCredits = credits;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -1039,9 +1039,9 @@
return failoverManager;
}
- public void sendProducerCreditsMessage(final int credits, final SimpleString destination)
+ public void sendProducerCreditsMessage(final int credits, final SimpleString address)
{
- channel.send(new SessionRequestProducerCreditsMessage(credits, destination));
+ channel.send(new SessionRequestProducerCreditsMessage(credits, address));
}
public ClientProducerCredits getCredits(final SimpleString address)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -68,7 +68,7 @@
void forceDelivery(long consumerID, long sequence) throws HornetQException;
- void sendProducerCreditsMessage(int credits, SimpleString destination);
+ void sendProducerCreditsMessage(int credits, SimpleString address);
ClientProducerCredits getCredits(SimpleString address);
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -503,9 +503,9 @@
session.workDone();
}
- public void sendProducerCreditsMessage(final int credits, final SimpleString destination)
+ public void sendProducerCreditsMessage(final int credits, final SimpleString address)
{
- session.sendProducerCreditsMessage(credits, destination);
+ session.sendProducerCreditsMessage(credits, address);
}
public ClientProducerCredits getCredits(final SimpleString address)
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -719,7 +719,7 @@
// consume
// them in
notificationMessage.setDurable(true);
- notificationMessage.setDestination(managementNotificationAddress);
+ notificationMessage.setAddress(managementNotificationAddress);
TypedProperties notifProps;
if (notification.getProperties() != null)
Modified: trunk/src/main/org/hornetq/core/message/BodyEncoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/BodyEncoder.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/message/BodyEncoder.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -20,20 +20,36 @@
/**
* Class used to encode message body into buffers.
+ * <br>
* Used to send large streams over the wire
+ *
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- * Created Nov 2, 2009
*/
public interface BodyEncoder
{
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void open() throws HornetQException;
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void close() throws HornetQException;
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
int encode(ByteBuffer bufferRead) throws HornetQException;
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
long getLargeBodySize();
}
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -23,9 +23,35 @@
import org.hornetq.utils.TypedProperties;
/**
- * A message is a routable instance that has a payload.
+ * A Message is a routable instance that has a payload.
+ * <br/>
+ * The payload (the "body") is opaque to the messaging system.
+ * A Message also has a fixed set of headers (required by the messaging system)
+ * and properties (defined by the users) that can be used by the messaging system
+ * to route the message (e.g. to ensure it matches a queue filter).
+ * <br>
+ * <h2>Message Properties</h2>
*
- * The payload is opaque to the messaging system.
+ * Message can contain properties specified by the users.
+ * It is possible to convert from some types to other types as specified
+ * by the following table:
+ * <pre>
+ * | | boolean byte short int long float double String byte[]
+ * |----------------------------------------------------------------
+ * |boolean | X X
+ * |byte | X X X X X
+ * |short | X X X X
+ * |int | X X X
+ * |long | X X
+ * |float | X X X
+ * |double | X X
+ * |String | X X X X X X X X
+ * |byte[] | X
+ * |-----------------------------------------------------------------
+ * </pre>
+ * <br>
+ * If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a {@code boolean}),
+ * a PropertyConversionException will be thrown.
*
* @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -36,52 +62,148 @@
*/
public interface Message
{
+ /**
+ * Returns the messageID.
+ * <br>
+ * The messageID is set when the message is handled by the server.
+ */
long getMessageID();
- SimpleString getDestination();
+ /**
+ * Returns the address this message is sent to.
+ */
+ SimpleString getAddress();
- void setDestination(SimpleString destination);
+ /**
+ * Sets the address to send this message to.
+ *
+ * This method must not be called directly by HornetQ clients.
+ *
+ * @param address address to send the message to
+ */
+ void setAddress(SimpleString address);
+ /**
+ * Returns this message type.
+ */
byte getType();
+ /**
+ * Returns whether this message is durable or not.
+ */
boolean isDurable();
+ /**
+ * Sets whether this message is durable or not.
+ *
+ * This method must not be called directly by HornetQ clients.
+
+ * @param durable {@code true} to flag this message as durable, {@code false} else
+ */
void setDurable(boolean durable);
+ /**
+ * Returns the expiration time of this message.
+ */
long getExpiration();
+ /**
+ * Returns whether this message is expired or not.
+ */
boolean isExpired();
+ /**
+ * Sets the expiration of this message.
+ * <br>
+ * This method must not be called directly by HornetQ clients.
+ *
+ * @param expiration expiration time
+ */
void setExpiration(long expiration);
+ /**
+ * Returns the message timestamp.
+ * <br>
+ * The timestamp corresponds to the time this message
+ * was handled by a HornetQ server.
+ */
long getTimestamp();
+ /**
+ * Sets the message timestamp.
+ * <br>
+ * This method must not be called directly by HornetQ clients.
+ *
+ * @param timestamp timestamp
+ */
void setTimestamp(long timestamp);
+ /**
+ * Returns the message priority.
+ *
+ * Values range from 0 (less priority) to 9 (more priority) inclusive.
+ */
byte getPriority();
+ /**
+ * Sets the message priority.
+ *
+ * Value must be between 0 and 9 inclusive.
+ *
+ * @param priority the new message priority
+ */
void setPriority(byte priority);
+ /**
+ * Returns the size of the <em>encoded</em> message.
+ */
int getEncodeSize();
+ /**
+ * Returns whether this message is a <em>large message</em> or a regular message.
+ */
boolean isLargeMessage();
+ /**
+ * Returns the message body as a HornetQBuffer
+ */
HornetQBuffer getBodyBuffer();
// Should the following methods really be on the public API?
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void decodeFromBuffer(HornetQBuffer buffer);
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
int getEndOfMessagePosition();
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
int getEndOfBodyPosition();
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void checkCopy();
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void bodyChanged();
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void resetCopied();
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
HornetQBuffer getEncodedBuffer();
// Properties
@@ -89,100 +211,316 @@
TypedProperties getProperties();
+ /**
+ * Puts a boolean property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putBooleanProperty(SimpleString key, boolean value);
+ /**
+ * @see #putBooleanProperty(SimpleString, boolean)
+ */
+ void putBooleanProperty(String key, boolean value);
+
+ /**
+ * Puts a byte property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putByteProperty(SimpleString key, byte value);
+ /**
+ * @see #putByteProperty(SimpleString, byte)
+ */
+ void putByteProperty(String key, byte value);
+
+ /**
+ * Puts a byte[] property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putBytesProperty(SimpleString key, byte[] value);
+ /**
+ * @see #putBytesProperty(SimpleString, byte[])
+ */
+ void putBytesProperty(String key, byte[] value);
+
+ /**
+ * Puts a short property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putShortProperty(SimpleString key, short value);
+ /**
+ * @see #putShortProperty(SimpleString, short)
+ */
+ void putShortProperty(String key, short value);
+
+ /**
+ * Puts a int property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putIntProperty(SimpleString key, int value);
+ /**
+ * @see #putIntProperty(SimpleString, int)
+ */
+ void putIntProperty(String key, int value);
+
+ /**
+ * Puts a long property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putLongProperty(SimpleString key, long value);
+ /**
+ * @see #putLongProperty(SimpleString, long)
+ */
+ void putLongProperty(String key, long value);
+
+ /**
+ * Puts a float property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putFloatProperty(SimpleString key, float value);
+ /**
+ * @see #putFloatProperty(SimpleString, float)
+ */
+ void putFloatProperty(String key, float value);
+
+ /**
+ * Puts a double property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putDoubleProperty(SimpleString key, double value);
+ /**
+ * @see #putDoubleProperty(SimpleString, double)
+ */
+ void putDoubleProperty(String key, double value);
+
+ /**
+ * Puts a SimpleString property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
void putStringProperty(SimpleString key, SimpleString value);
+ /**
+ * Puts a String property in this message.
+ *
+ * @param key property name
+ * @param value property value
+ */
+ void putStringProperty(String key, String value);
+
+ /**
+ * Puts an Object property in this message.
+ * <br>
+ * Accepted types are:
+ * <ul>
+ * <li>Boolean</li>
+ * <li>Byte</li>
+ * <li>Short</li>
+ * <li>Integer</li>
+ * <li>Long</li>
+ * <li>Float</li>
+ * <li>Double</li>
+ * <li>String</li>
+ * <li>SimpleString</li>
+ * </ul>
+ *
+ * Using any other type will throw a PropertyConversionException.
+ *
+ * @param key property name
+ * @param value property value
+ *
+ * @throws PropertyConversionException if the value is not one of the accepted property types.
+ */
void putObjectProperty(SimpleString key, Object value) throws PropertyConversionException;
- void putBooleanProperty(String key, boolean value);
-
- void putByteProperty(String key, byte value);
-
- void putBytesProperty(String key, byte[] value);
-
- void putShortProperty(String key, short value);
-
- void putIntProperty(String key, int value);
-
- void putLongProperty(String key, long value);
-
- void putFloatProperty(String key, float value);
-
- void putDoubleProperty(String key, double value);
-
- void putStringProperty(String key, String value);
-
+ /**
+ * @see #putObjectProperty(SimpleString, Object)
+ */
void putObjectProperty(String key, Object value) throws PropertyConversionException;
void putTypedProperties(TypedProperties properties);
+ /**
+ * Removes the property corresponding to the specified key.
+ * @param key property name
+ * @return the value corresponding to the specified key or @{code null}
+ */
Object removeProperty(SimpleString key);
+
+ /**
+ * @see #removeProperty(SimpleString)
+ */
+ Object removeProperty(String key);
+
+ /**
+ * Returns {@code true} if this message contains a property with the given key, {@code false} else.
+ *
+ * @param key property name
+ */
boolean containsProperty(SimpleString key);
+
+ /**
+ * @see #containsProperty(SimpleString)
+ */
+ boolean containsProperty(String key);
+ /**
+ * Returns the property corresponding to the specified key as a Boolean.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a Boolean
+ */
Boolean getBooleanProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getBooleanProperty(SimpleString)
+ */
Boolean getBooleanProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as a Byte.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a Byte
+ */
Byte getByteProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getByteProperty(SimpleString)
+ */
Byte getByteProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as a Double.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a Double
+ */
Double getDoubleProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getDoubleProperty(SimpleString)
+ */
Double getDoubleProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as an Integer.
+ *
+ * @throws PropertyConversionException if the value can not be converted to an Integer
+ */
Integer getIntProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getIntProperty(SimpleString)
+ */
Integer getIntProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as a Long.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a Long
+ */
Long getLongProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getLongProperty(SimpleString)
+ */
Long getLongProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key
+ */
Object getObjectProperty(SimpleString key);
+ /**
+ * @see #getBooleanProperty(SimpleString)
+ */
Object getObjectProperty(String key);
+ /**
+ * Returns the property corresponding to the specified key as a Short.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a Short
+ */
Short getShortProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getShortProperty(SimpleString)
+ */
Short getShortProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as a Float.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a Float
+ */
Float getFloatProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getFloatProperty(SimpleString)
+ */
Float getFloatProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as a String.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a String
+ */
String getStringProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getStringProperty(SimpleString)
+ */
String getStringProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as a SimpleString.
+ *
+ * @throws PropertyConversionException if the value can not be converted to a SimpleString
+ */
SimpleString getSimpleStringProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getSimpleStringProperty(SimpleString)
+ */
SimpleString getSimpleStringProperty(String key) throws PropertyConversionException;
+ /**
+ * Returns the property corresponding to the specified key as a byte[].
+ *
+ * @throws PropertyConversionException if the value can not be converted to a byte[]
+ */
byte[] getBytesProperty(SimpleString key) throws PropertyConversionException;
+ /**
+ * @see #getBytesProperty(SimpleString)
+ */
byte[] getBytesProperty(String key) throws PropertyConversionException;
- Object removeProperty(String key);
-
- boolean containsProperty(String key);
-
+ /**
+ * Returns all the names of the properties for this message.
+ */
Set<SimpleString> getPropertyNames();
Map<String, Object> toMap();
@@ -190,17 +528,34 @@
// FIXME - All this stuff is only necessary here for large messages - it should be refactored to be put in a better
// place
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
int getHeadersAndPropertiesEncodeSize();
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
HornetQBuffer getWholeBuffer();
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void encodeHeadersAndProperties(HornetQBuffer buffer);
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
void decodeHeadersAndProperties(HornetQBuffer buffer);
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
BodyEncoder getBodyEncoder() throws HornetQException;
- /** Get the InputStream used on a message that will be sent over a producer */
+ /**
+ * This method must not be called directly by HornetQ clients.
+ */
InputStream getBodyInputStream();
}
Modified: trunk/src/main/org/hornetq/core/message/PropertyConversionException.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/PropertyConversionException.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/message/PropertyConversionException.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -14,30 +14,33 @@
package org.hornetq.core.message;
/**
- * A PropertyConversionException
+ * A PropertyConversionException is thrown by Message methods
+ * when a property can not be converted to the expected type.
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*
+ * @see Message
*
*/
public class PropertyConversionException extends RuntimeException
{
+ // Constants -----------------------------------------------------
+
private static final long serialVersionUID = -3010008708334904332L;
- public PropertyConversionException(final String message)
- {
- super(message);
- }
-
- // Constants -----------------------------------------------------
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
+
+ public PropertyConversionException(final String message)
+ {
+ super(message);
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -54,7 +54,7 @@
public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("_HQ_ACTUAL_EXPIRY");
- public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("_HQ_ORIG_DESTINATION");
+ public static final SimpleString HDR_ORIGINAL_ADDRESS = new SimpleString("_HQ_ORIG_ADDRESS");
public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");
@@ -74,7 +74,7 @@
protected long messageID;
- protected SimpleString destination;
+ protected SimpleString address;
protected byte type;
@@ -153,7 +153,7 @@
protected MessageImpl(final MessageImpl other)
{
messageID = other.getMessageID();
- destination = other.getDestination();
+ address = other.getAddress();
type = other.getType();
durable = other.isDurable();
expiration = other.getExpiration();
@@ -192,7 +192,7 @@
public int getHeadersAndPropertiesEncodeSize()
{
return DataConstants.SIZE_LONG + // Message ID
- /* Destination */SimpleString.sizeofString(destination) +
+ /* address */SimpleString.sizeofString(address) +
DataConstants./* Type */SIZE_BYTE +
DataConstants./* Durable */SIZE_BOOLEAN +
DataConstants./* Expiration */SIZE_LONG +
@@ -204,7 +204,7 @@
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
{
buffer.writeLong(messageID);
- buffer.writeSimpleString(destination);
+ buffer.writeSimpleString(address);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
@@ -216,7 +216,7 @@
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
messageID = buffer.readLong();
- destination = buffer.readSimpleString();
+ address = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
@@ -249,16 +249,16 @@
return messageID;
}
- public SimpleString getDestination()
+ public SimpleString getAddress()
{
- return destination;
+ return address;
}
- public void setDestination(final SimpleString destination)
+ public void setAddress(final SimpleString address)
{
- if (this.destination != destination)
+ if (this.address != address)
{
- this.destination = destination;
+ this.address = address;
bufferValid = false;
}
@@ -344,7 +344,7 @@
Map<String, Object> map = new HashMap<String, Object>();
map.put("messageID", messageID);
- map.put("destination", destination.toString());
+ map.put("address", address.toString());
map.put("type", type);
map.put("durable", durable);
map.put("expiration", expiration);
@@ -556,6 +556,10 @@
{
properties.putSimpleStringProperty(key, new SimpleString((String)value));
}
+ else if (value instanceof SimpleString)
+ {
+ properties.putSimpleStringProperty(key, (SimpleString)value);
+ }
else
{
throw new PropertyConversionException(value.getClass() + " is not a valid property type");
Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -30,9 +30,9 @@
| |
| |
| | 1
- | N +-------------------+
- +--------> | DestinationAdress |
- +-------------------+
+ | N +---------+
+ +--------> | Address |
+ +---------+
</PRE>
@@ -50,7 +50,7 @@
/** An injection point for the PostOffice to inject itself */
void setPostOffice(PostOffice postOffice);
- /** Used to start depaging every paged destination, after a reload/restart */
+ /** Used to start depaging every paged address, after a reload/restart */
void resumeDepages() throws Exception;
/**
Modified: trunk/src/main/org/hornetq/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStoreFactory.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/paging/PagingStoreFactory.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -30,7 +30,7 @@
*/
public interface PagingStoreFactory
{
- PagingStore newStore(SimpleString destinationName, AddressSettings addressSettings) throws Exception;
+ PagingStore newStore(SimpleString address, AddressSettings addressSettings) throws Exception;
void stop() throws InterruptedException;
@@ -42,6 +42,6 @@
List<PagingStore> reloadStores(HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception;
- SequentialFileFactory newFileFactory(SimpleString destinationName) throws Exception;
+ SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -86,14 +86,11 @@
return names.toArray(new SimpleString[names.size()]);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#reloadStores()
- */
public synchronized void reloadStores() throws Exception
{
- List<PagingStore> destinations = pagingStoreFactory.reloadStores(addressSettingsRepository);
+ List<PagingStore> reloadedStores = pagingStoreFactory.reloadStores(addressSettingsRepository);
- for (PagingStore store : destinations)
+ for (PagingStore store : reloadedStores)
{
store.start();
stores.put(store.getStoreName(), store);
@@ -101,10 +98,6 @@
}
- /**
- * @param destination
- * @return
- */
private synchronized PagingStore createPageStore(final SimpleString storeName) throws Exception
{
PagingStore store = stores.get(storeName);
@@ -244,10 +237,10 @@
// Private -------------------------------------------------------
- private PagingStore newStore(final SimpleString destinationName) throws Exception
+ private PagingStore newStore(final SimpleString address) throws Exception
{
- return pagingStoreFactory.newStore(destinationName,
- addressSettingsRepository.getMatch(destinationName.toString()));
+ return pagingStoreFactory.newStore(address,
+ addressSettingsRepository.getMatch(address.toString()));
}
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -102,7 +102,7 @@
syncNonTransactional);
}
- public synchronized SequentialFileFactory newFileFactory(final SimpleString destinationName) throws Exception
+ public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception
{
String guid = UUIDGenerator.getInstance().generateStringUUID();
@@ -120,7 +120,7 @@
try
{
- writer.write(destinationName.toString());
+ writer.write(address.toString());
writer.newLine();
}
finally
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -644,7 +644,7 @@
returnPage.open();
returnPage.delete();
- // This will trigger this Destination to exit the page mode,
+ // This will trigger this address to exit the page mode,
// and this will make HornetQ start using the journal again
return null;
}
@@ -917,7 +917,7 @@
* If persistent messages are also used, it will update eventual PageTransactions
*/
- private boolean onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> pagedMessages) throws Exception
+ private boolean onDepage(final int pageId, final SimpleString address, final List<PagedMessage> pagedMessages) throws Exception
{
if (PagingStoreImpl.isTrace)
{
@@ -1092,7 +1092,7 @@
addressFull);
}
- // It should stop the executor when the destination is full or when there is nothing else to be depaged
+ // It should stop the executor when the address is full or when there is nothing else to be depaged
if (addressFull || !isPaging())
{
depaging.set(false);
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -555,7 +555,7 @@
throw new IllegalStateException("Message cannot be routed more than once");
}
- SimpleString address = message.getDestination();
+ SimpleString address = message.getAddress();
setPagingStore(message);
@@ -567,7 +567,7 @@
if (duplicateID != null)
{
- cache = getDuplicateIDCache(message.getDestination());
+ cache = getDuplicateIDCache(message.getAddress());
if (duplicateID instanceof SimpleString)
{
@@ -662,7 +662,7 @@
{
message.setOriginalHeaders(message, false);
- message.setDestination(dlaAddress);
+ message.setAddress(dlaAddress);
route(message, context.getTransaction());
}
@@ -713,7 +713,7 @@
public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
{
- Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
+ Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress());
boolean res = false;
@@ -784,7 +784,7 @@
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
- message.setDestination(queueName);
+ message.setAddress(queueName);
message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true);
routeDirect(message, queue, false);
@@ -842,7 +842,7 @@
private void setPagingStore(final ServerMessage message) throws Exception
{
- PagingStore store = pagingManager.getPageStore(message.getDestination());
+ PagingStore store = pagingManager.getPageStore(message.getAddress());
message.setPagingStore(store);
}
@@ -992,7 +992,7 @@
{
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
- message.setDestination(queueName);
+ message.setAddress(queueName);
// message.setDurable(true);
String uid = UUIDGenerator.getInstance().generateStringUUID();
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -137,9 +137,9 @@
Address add = getAddress(binding.getAddress());
if (!add.containsWildCard())
{
- for (Address destination : add.getLinkedAddresses())
+ for (Address theAddress : add.getLinkedAddresses())
{
- Bindings bindings = super.getBindingsForRoutingAddress(destination.getAddress());
+ Bindings bindings = super.getBindingsForRoutingAddress(theAddress.getAddress());
if (bindings != null)
{
for (Binding b : bindings.getBindings())
@@ -151,9 +151,9 @@
}
else
{
- for (Address destination : add.getLinkedAddresses())
+ for (Address theAddress : add.getLinkedAddresses())
{
- super.removeBindingInternal(destination.getAddress(), uniqueName);
+ super.removeBindingInternal(theAddress.getAddress(), uniqueName);
}
}
removeAndUpdateAddressMap(add);
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -528,7 +528,7 @@
{
PagedMessage pgdMessage = packet.getPagedMessage();
ServerMessage msg = pgdMessage.getMessage(storage);
- Page page = getPage(msg.getDestination(), packet.getPageNumber());
+ Page page = getPage(msg.getAddress(), packet.getPageNumber());
page.write(pgdMessage);
}
Modified: trunk/src/main/org/hornetq/core/security/Role.java
===================================================================
--- trunk/src/main/org/hornetq/core/security/Role.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/security/Role.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -16,7 +16,7 @@
import java.io.Serializable;
/**
- * A role is used by the security store to define access rights and is configured on a connection factory or destination
+ * A role is used by the security store to define access rights and is configured on a connection factory or an address.
*
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
*/
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -413,7 +413,7 @@
{
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
- // the one pertinent for the destination node - this is important since different queues on different
+ // the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
message = message.copy();
@@ -472,7 +472,7 @@
else
{
// Preserve the original address
- dest = message.getDestination();
+ dest = message.getAddress();
}
producer.send(dest, message);
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -81,17 +81,17 @@
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
// We must make a copy of the message, otherwise things like returning credits to the page won't work
- // properly on ack, since the original destination will be overwritten
+ // properly on ack, since the original address will be overwritten
// TODO we can optimise this so it doesn't copy if it's not routed anywhere else
long id = storageManager.generateUniqueID();
ServerMessage copy = message.copy(id);
- // This will set the original MessageId, and the original destination
+ // This will set the original MessageId, and the original address
copy.setOriginalHeaders(message, false);
- copy.setDestination(forwardAddress);
+ copy.setAddress(forwardAddress);
if (transformer != null)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -1136,7 +1136,7 @@
queueDeployer.start();
}
- // We need to call this here, this gives any dependent server a chance to deploy its own destinations
+ // We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is initialised
callActivateCallbacks();
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -965,7 +965,7 @@
{
ServerMessage copyMessage = makeCopy(ref, expiry);
- copyMessage.setDestination(toAddress);
+ copyMessage.setAddress(toAddress);
postOffice.route(copyMessage, tx);
@@ -980,7 +980,7 @@
because otherwise we may end up with a ref with the same message id in the
queue more than once which would barf - this might happen if the same message had been
expire from multiple subscriptions of a topic for example
- We set headers that hold the original message destination, expiry time
+ We set headers that hold the original message address, expiry time
and original message id
*/
@@ -1054,7 +1054,7 @@
ServerMessage copyMessage = makeCopy(ref, expiry);
- copyMessage.setDestination(address);
+ copyMessage.setAddress(address);
postOffice.route(copyMessage, tx);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -351,7 +351,7 @@
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setDestination(messageQueue.getName());
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -195,7 +195,7 @@
because otherwise we may end up with a ref with the same message id in the
queue more than once which would barf - this might happen if the same message had been
expire from multiple subscriptions of a topic for example
- We set headers that hold the original message destination, expiry time
+ We set headers that hold the original message address, expiry time
and original message id
*/
@@ -210,16 +210,16 @@
{
if (other.containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
{
- putStringProperty(MessageImpl.HDR_ORIGINAL_DESTINATION,
- other.getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_DESTINATION));
+ putStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS,
+ other.getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS));
putLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID, other.getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID));
}
else
{
- SimpleString originalQueue = other.getDestination();
+ SimpleString originalQueue = other.getAddress();
- putStringProperty(MessageImpl.HDR_ORIGINAL_DESTINATION, originalQueue);
+ putStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS, originalQueue);
putLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID, other.getMessageID());
}
@@ -243,7 +243,7 @@
// On the server side, we reset the address to point to the instance of address in the paging store
// Otherwise each message would have its own copy of the address String which would take up more memory
- destination = pagingStore.getAddress();
+ address = pagingStore.getAddress();
}
public PagingStore getPagingStore()
@@ -293,8 +293,8 @@
return "ServerMessage[messageID=" + messageID +
", durable=" +
durable +
- ", destination=" +
- getDestination() +
+ ", address=" +
+ getAddress() +
"]";
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -1458,7 +1458,7 @@
message.setMessageID(id);
message.encodeMessageIDToBuffer();
- if (message.getDestination().equals(managementAddress))
+ if (message.getAddress().equals(managementAddress))
{
// It's a management message
@@ -1840,7 +1840,7 @@
{
try
{
- securityStore.check(message.getDestination(), CheckType.MANAGE, this);
+ securityStore.check(message.getAddress(), CheckType.MANAGE, this);
}
catch (HornetQException e)
{
@@ -1857,7 +1857,7 @@
if (replyTo != null)
{
- reply.setDestination(replyTo);
+ reply.setAddress(replyTo);
send(reply);
}
@@ -1927,7 +1927,7 @@
*/
private void releaseOutStanding(final ServerMessage message, final int credits) throws Exception
{
- CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
+ CreditManagerHolder holder = getCreditManagerHolder(message.getAddress());
holder.outstandingCredits -= credits;
@@ -1966,7 +1966,7 @@
// check the user has write access to this address.
try
{
- securityStore.check(msg.getDestination(), CheckType.SEND, this);
+ securityStore.check(msg.getAddress(), CheckType.SEND, this);
}
catch (HornetQException e)
{
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -443,7 +443,7 @@
{
if (dest == null)
{
- SimpleString sdest = message.getDestination();
+ SimpleString sdest = message.getAddress();
dest = sdest == null ? null : HornetQDestination.fromAddress(sdest.toString());
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -52,7 +52,7 @@
ClientSession session = cf.createSession(false, true, true);
ClientMessage message = session.createMessage(false);
// we need to set the destination so we can calculate the encodesize correctly
- message.setDestination(address);
+ message.setAddress(address);
int encodeSize = message.getEncodeSize();
session.close();
cf.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -62,7 +62,7 @@
ClientSession session = cf.createSession(false, true, true);
ClientMessage message = session.createMessage(false);
// we need to set the destination so we can calculate the encodesize correctly
- message.setDestination(address);
+ message.setAddress(address);
int encodeSize = message.getEncodeSize();
session.close();
cf.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -97,7 +97,7 @@
// Remove destination as an attribute from client producer.
// The destination always has to be set explicity before sending a message
- message.setDestination(QUEUE);
+ message.setAddress(QUEUE);
message.getBodyBuffer().writeString("testINVMCoreClient");
Modified: trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -208,7 +208,7 @@
Assert.assertEquals("Message:" + i, text);
// Check the headers
- SimpleString origDest = (SimpleString)tm.getObjectProperty(MessageImpl.HDR_ORIGINAL_DESTINATION);
+ SimpleString origDest = (SimpleString)tm.getObjectProperty(MessageImpl.HDR_ORIGINAL_ADDRESS);
Long origMessageId = (Long)tm.getObjectProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -172,7 +172,7 @@
ClientMessage expiredMessage = expiryConsumer.receive(500);
Assert.assertNotNull(expiredMessage);
Assert.assertNotNull(expiredMessage.getObjectProperty(MessageImpl.HDR_ACTUAL_EXPIRY_TIME));
- Assert.assertEquals(address, expiredMessage.getObjectProperty(MessageImpl.HDR_ORIGINAL_DESTINATION));
+ Assert.assertEquals(address, expiredMessage.getObjectProperty(MessageImpl.HDR_ORIGINAL_ADDRESS));
consumer.close();
expiryConsumer.close();
session.deleteQueue(queue);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -548,7 +548,7 @@
ClientSession session = cf.createSession(false, true, true);
ClientMessage message = session.createMessage(false);
// we need to set the destination so we can calculate the encodesize correctly
- message.setDestination(address);
+ message.setAddress(address);
int encodeSize = message.getEncodeSize();
session.close();
cf.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -284,7 +284,7 @@
ServerMessage msg = new ServerMessageImpl(1, 1024);
SimpleString dummy = new SimpleString("dummy");
- msg.setDestination(dummy);
+ msg.setAddress(dummy);
replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
@@ -317,7 +317,7 @@
ServerMessageImpl serverMsg = new ServerMessageImpl();
serverMsg.setMessageID(500);
- serverMsg.setDestination(new SimpleString("tttt"));
+ serverMsg.setAddress(new SimpleString("tttt"));
HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(100);
serverMsg.encodeHeadersAndProperties(buffer);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -104,7 +104,7 @@
public void testHQSize() throws Exception
{
- message.setDestination(RandomUtil.randomSimpleString());
+ message.setAddress(RandomUtil.randomSimpleString());
int encodeSize = message.getEncodeSize();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -64,8 +64,8 @@
final long timestamp2 = RandomUtil.randomLong();
final byte priority2 = RandomUtil.randomByte();
- message.setDestination(destination);
- Assert.assertEquals(destination, message.getDestination());
+ message.setAddress(destination);
+ Assert.assertEquals(destination, message.getAddress());
message.setDurable(durable2);
Assert.assertEquals(durable2, message.isDurable());
@@ -242,7 +242,7 @@
.toByteBuffer()
.array());
- Assert.assertEquals(msg1.getDestination(), msg2.getDestination());
+ Assert.assertEquals(msg1.getAddress(), msg2.getAddress());
Set<SimpleString> props1 = msg1.getPropertyNames();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -106,7 +106,7 @@
for (int i = 0; i < msgs.size(); i++)
{
- Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getDestination());
+ Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
.getMessage(null)
@@ -178,7 +178,7 @@
for (int i = 0; i < msgs.size(); i++)
{
- Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getDestination());
+ Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
.getMessage(null)
@@ -221,7 +221,7 @@
buffers.add(msg.getBodyBuffer());
- msg.setDestination(simpleDestination);
+ msg.setAddress(simpleDestination);
page.write(new PagedMessageImpl(msg));
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -118,7 +118,7 @@
{
ServerMessage msg = new ServerMessageImpl(messageId, 200);
- msg.setDestination(destination);
+ msg.setAddress(destination);
msg.getBodyBuffer().writeBytes(buffer);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -611,7 +611,7 @@
ServerMessage msgWritten = buffers.remove(id);
buffers2.put(id, msg.getMessage(null));
Assert.assertNotNull(msgWritten);
- Assert.assertEquals(msg.getMessage(null).getDestination(), msgWritten.getDestination());
+ Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
UnitTestCase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage(null).getBodyBuffer());
}
}
@@ -681,7 +681,7 @@
long id = msg.getMessage(null).getBodyBuffer().readLong();
ServerMessage msgWritten = buffers2.remove(id);
Assert.assertNotNull(msgWritten);
- Assert.assertEquals(msg.getMessage(null).getDestination(), msgWritten.getDestination());
+ Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().toByteBuffer().array(), msg.getMessage(null)
.getBodyBuffer()
.toByteBuffer()
@@ -727,7 +727,7 @@
{
ServerMessage msg = new ServerMessageImpl(id, 50 + buffer.capacity());
- msg.setDestination(destination);
+ msg.setAddress(destination);
msg.setPagingStore(store);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -490,7 +490,7 @@
return null;
}
- public SimpleString getDestination()
+ public SimpleString getAddress()
{
// TODO Auto-generated method stub
return null;
@@ -832,7 +832,7 @@
}
- public void setDestination(final SimpleString destination)
+ public void setAddress(final SimpleString destination)
{
// TODO Auto-generated method stub
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-12-09 16:59:54 UTC (rev 8642)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-12-09 17:34:17 UTC (rev 8643)
@@ -856,7 +856,7 @@
message.getBodyBuffer().writeString(UUID.randomUUID().toString());
- message.setDestination(new SimpleString("foo"));
+ message.setAddress(new SimpleString("foo"));
return message;
}
15 years
JBoss hornetq SVN: r8642 - in trunk: tests/src/org/hornetq/tests/unit/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 11:59:54 -0500 (Wed, 09 Dec 2009)
New Revision: 8642
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-241 - Changing TimedBuffer to stop spinning when there's no pending syncs
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-09 16:09:45 UTC (rev 8641)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-09 16:59:54 UTC (rev 8642)
@@ -143,7 +143,7 @@
if (timedBuffer != null)
{
// When moving to a new file, we need to make sure any pending buffer will be transfered to the buffer
- timedBuffer.flush(true);
+ timedBuffer.flush();
timedBuffer.setObserver(null);
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 16:09:45 UTC (rev 8641)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 16:59:54 UTC (rev 8642)
@@ -20,6 +20,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.HornetQBuffer;
@@ -46,13 +47,8 @@
private TimedBufferObserver bufferObserver;
- // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
- // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
- // prevent that
- private final Semaphore spinLimiter = new Semaphore(1);
+ private CheckTimer timer;
- private CheckTimer timerRunnable = new CheckTimer();
-
private final int bufferSize;
private final HornetQBuffer buffer;
@@ -125,9 +121,9 @@
return;
}
- timerRunnable = new CheckTimer();
+ timer = new CheckTimer();
- timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
+ timerThread = new Thread(timer, "hornetq-buffer-timeout");
timerThread.start();
@@ -152,10 +148,8 @@
bufferObserver = null;
- spinLimiter.release();
+ timer.close();
- timerRunnable.close();
-
if (logRates)
{
logRatesTimerTask.cancel();
@@ -238,15 +232,9 @@
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
{
+
delayFlush = false;
- if (buffer.writerIndex() == 0)
- {
- // More bytes have been added so the timer flush thread can resume
-
- spinLimiter.release();
- }
-
bytes.encode(buffer);
callbacks.add(callback);
@@ -265,22 +253,18 @@
// }
}
+ timer.resumeSpin();
}
- public void flush()
- {
- flush(false);
- }
-
/**
* force means the Journal is moving to a new file. Any pending write need to be done immediately
* or data could be lost
* */
- public void flush(final boolean force)
+ public synchronized void flush()
{
synchronized (this)
{
- if ((force || !delayFlush) && buffer.writerIndex() > 0)
+ if (buffer.writerIndex() > 0)
{
int pos = buffer.writerIndex();
@@ -301,17 +285,6 @@
bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
}
- try
- {
- // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
- // when the buffer is inactive
- spinLimiter.acquire();
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
-
lastFlushTime.set(System.nanoTime());
pendingSync = false;
@@ -323,6 +296,8 @@
bufferLimit = 0;
flushesDone.incrementAndGet();
+
+ timer.pauseSpin();
}
}
}
@@ -387,6 +362,58 @@
{
private volatile boolean closed = false;
+ private boolean spinning = false;
+
+ // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+ // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+ // prevent that
+ private final Semaphore spinLimiter = new Semaphore(1);
+
+ public CheckTimer()
+ {
+ if (!spinLimiter.tryAcquire())
+ {
+ // JDK would be screwed up if this was happening
+ throw new IllegalStateException("InternalError: Semaphore not working properly!");
+ }
+ spinning = false;
+ }
+
+ // Needs to be called within synchronized blocks on TimedBuffer
+ public void resumeSpin()
+ {
+ synchronized (TimedBuffer.this)
+ {
+ if (!spinning)
+ {
+ spinning = true;
+ spinLimiter.release();
+ }
+ }
+ }
+
+ // Needs to be called within synchronized blocks on TimedBuffer
+ public void pauseSpin()
+ {
+ synchronized (TimedBuffer.this)
+ {
+ if (spinning)
+ {
+ spinning = false;
+ try
+ {
+ if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Internal error on TimedBuffer. Can't stop spinning");
+ }
+ }
+ catch (InterruptedException ignored)
+ {
+ }
+ }
+ }
+ }
+
public void run()
{
while (!closed)
@@ -395,9 +422,22 @@
// timeout since the time of the last flush
// Effectively flushing "resets" the timer
- if (pendingSync && bufferObserver != null && System.nanoTime() > lastFlushTime.get() + timeout)
+ if (System.nanoTime() > lastFlushTime.get() + timeout)
{
- flush();
+ // delayFlush and pendingSync are changed inside synchronized blocks
+ // They need to be done atomically
+ synchronized (TimedBuffer.this)
+ {
+ if (!delayFlush && pendingSync && bufferObserver != null)
+ {
+ flush();
+ }
+ else if (!closed && !delayFlush)
+ {
+ // if delayFlush is set, it means we have to keep trying, we can't stop spinning on this case
+ pauseSpin();
+ }
+ }
}
try
@@ -417,6 +457,7 @@
public void close()
{
closed = true;
+ resumeSpin();
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-09 16:09:45 UTC (rev 8641)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-09 16:59:54 UTC (rev 8642)
@@ -41,6 +41,8 @@
// Attributes ----------------------------------------------------
+ private static final int ONE_SECOND = 1000000000; // in nanoseconds
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -85,13 +87,91 @@
}
}
- TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false); // Any big timeout
+ TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND, false);
- timedBuffer.setObserver(new TestObserver());
+ timedBuffer.start();
- int x = 0;
- for (int i = 0; i < 10; i++)
+ try
{
+
+ timedBuffer.setObserver(new TestObserver());
+
+ int x = 0;
+ for (int i = 0; i < 10; i++)
+ {
+ byte[] bytes = new byte[10];
+ for (int j = 0; j < 10; j++)
+ {
+ bytes[j] = UnitTestCase.getSamplebyte(x++);
+ }
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(bytes);
+
+ timedBuffer.checkSize(10);
+ timedBuffer.addBytes(buff, false, dummyCallback);
+ }
+
+ timedBuffer.checkSize(1);
+
+ Assert.assertEquals(1, flushTimes.get());
+
+ ByteBuffer flushedBuffer = buffers.get(0);
+
+ Assert.assertEquals(100, flushedBuffer.limit());
+
+ Assert.assertEquals(100, flushedBuffer.capacity());
+
+ flushedBuffer.rewind();
+
+ for (int i = 0; i < 100; i++)
+ {
+ Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+ }
+ }
+ finally
+ {
+ timedBuffer.stop();
+ }
+
+ }
+
+ public void testTimingAndFlush() throws Exception
+ {
+ final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ final AtomicInteger flushTimes = new AtomicInteger(0);
+ class TestObserver implements TimedBufferObserver
+ {
+ public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOAsyncTask> callbacks)
+ {
+ buffers.add(buffer);
+ flushTimes.incrementAndGet();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+ */
+ public ByteBuffer newBuffer(final int minSize, final int maxSize)
+ {
+ return ByteBuffer.allocate(maxSize);
+ }
+
+ public int getRemainingBytes()
+ {
+ return 1024 * 1024;
+ }
+ }
+
+ TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND / 10, false);
+
+ timedBuffer.start();
+
+ try
+ {
+
+ timedBuffer.setObserver(new TestObserver());
+
+ int x = 0;
+
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++)
{
@@ -102,23 +182,42 @@
timedBuffer.checkSize(10);
timedBuffer.addBytes(buff, false, dummyCallback);
- }
- timedBuffer.checkSize(1);
+ Thread.sleep(200);
- Assert.assertEquals(1, flushTimes.get());
+ Assert.assertEquals(0, flushTimes.get());
- ByteBuffer flushedBuffer = buffers.get(0);
+ bytes = new byte[10];
+ for (int j = 0; j < 10; j++)
+ {
+ bytes[j] = UnitTestCase.getSamplebyte(x++);
+ }
- Assert.assertEquals(100, flushedBuffer.limit());
+ buff = HornetQBuffers.wrappedBuffer(bytes);
- Assert.assertEquals(100, flushedBuffer.capacity());
+ timedBuffer.checkSize(10);
+ timedBuffer.addBytes(buff, true, dummyCallback);
- flushedBuffer.rewind();
+ Thread.sleep(500);
- for (int i = 0; i < 100; i++)
+ Assert.assertEquals(1, flushTimes.get());
+
+ ByteBuffer flushedBuffer = buffers.get(0);
+
+ Assert.assertEquals(20, flushedBuffer.limit());
+
+ Assert.assertEquals(20, flushedBuffer.capacity());
+
+ flushedBuffer.rewind();
+
+ for (int i = 0; i < 20; i++)
+ {
+ Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+ }
+ }
+ finally
{
- Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+ timedBuffer.stop();
}
}
15 years
JBoss hornetq SVN: r8641 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-09 11:09:45 -0500 (Wed, 09 Dec 2009)
New Revision: 8641
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
reverted r8634
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 14:39:46 UTC (rev 8640)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 16:09:45 UTC (rev 8641)
@@ -115,7 +115,7 @@
callbacks = new ArrayList<IOAsyncTask>();
- this.timeout = timeout;
+ this.timeout = timeout;
}
public synchronized void start()
@@ -137,15 +137,6 @@
logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
}
-
- //Need to start with the spin limiter acquired
- try
- {
- spinLimiter.acquire();
- }
- catch (InterruptedException ignore)
- {
- }
started = true;
}
@@ -249,6 +240,13 @@
{
delayFlush = false;
+ if (buffer.writerIndex() == 0)
+ {
+ // More bytes have been added so the timer flush thread can resume
+
+ spinLimiter.release();
+ }
+
bytes.encode(buffer);
callbacks.add(callback);
@@ -265,13 +263,6 @@
//
// flush();
// }
-
- if (buffer.writerIndex() == 0)
- {
- // More bytes have been added so the timer flush thread can resume
-
- spinLimiter.release();
- }
}
}
15 years
JBoss hornetq SVN: r8640 - trunk/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-09 09:39:46 -0500 (Wed, 09 Dec 2009)
New Revision: 8640
Modified:
trunk/src/main/org/hornetq/utils/Pair.java
trunk/src/main/org/hornetq/utils/SimpleString.java
Log:
SimpleString and Pair javadocs
Modified: trunk/src/main/org/hornetq/utils/Pair.java
===================================================================
--- trunk/src/main/org/hornetq/utils/Pair.java 2009-12-09 13:57:51 UTC (rev 8639)
+++ trunk/src/main/org/hornetq/utils/Pair.java 2009-12-09 14:39:46 UTC (rev 8640)
@@ -17,7 +17,7 @@
/**
*
- * A Pair
+ * A Pair is basically a holder for 2 objects.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
Modified: trunk/src/main/org/hornetq/utils/SimpleString.java
===================================================================
--- trunk/src/main/org/hornetq/utils/SimpleString.java 2009-12-09 13:57:51 UTC (rev 8639)
+++ trunk/src/main/org/hornetq/utils/SimpleString.java 2009-12-09 14:39:46 UTC (rev 8640)
@@ -20,20 +20,20 @@
import org.hornetq.core.logging.Logger;
/**
- *
- * A SimpleString
- *
* A simple String class that can store all characters, and stores as simple byte[],
- * this minimises expensive copying between String objects
+ * this minimises expensive copying between String objects.
+ *
+ * This object is used heavily throughout HornetQ for performance reasons.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * TODO - implement an intern() method like in java.lang.String, since many Strings e.g. addresses, queue names, remote node ids are duplicated heavily
- * in bindings taking up more memory than they should
- * Intern can be called when receiving a sent message at the server (destination)
- * Also when receiving bindings remotely via bridge, the address, queue name and node id can be interned
*
*/
+
+ // TODO - implement an intern() method like in java.lang.String, since many Strings e.g. addresses, queue names, remote node ids are duplicated heavily
+ //in bindings taking up more memory than they should
+ //Intern can be called when receiving a sent message at the server (destination)
+ //Also when receiving bindings remotely via bridge, the address, queue name and node id can be interned
+ //
public class SimpleString implements CharSequence, Serializable, Comparable<SimpleString>
{
private static final long serialVersionUID = 4204223851422244307L;
@@ -67,7 +67,10 @@
// Constructors
// ----------------------------------------------------------------------
-
+ /**
+ * creates a SimpleString from a conventional String
+ * @param string the string to transform
+ */
public SimpleString(final String string)
{
int len = string.length();
@@ -92,6 +95,10 @@
str = string;
}
+ /**
+ * creates a SimpleString from a byte array
+ * @param data the byte array to use
+ */
public SimpleString(final byte[] data)
{
this.data = data;
@@ -145,11 +152,20 @@
// Public
// ---------------------------------------------------------------------------
+ /**
+ * returns the underlying byte array of this SimpleString
+ * @return the byte array
+ */
public byte[] getData()
{
return data;
}
+ /**
+ * returns true if the SimpleString parameter starts with the same data as this one. false if not.
+ * @param other the SimpelString to look for
+ * @return true if this SimpleString starts with the same data
+ */
public boolean startsWith(final SimpleString other)
{
byte[] otherdata = other.data;
@@ -240,6 +256,13 @@
return hash;
}
+ /**
+ * splits this SimpleString into an array of SimpleString using the char param as the delimeter.
+ *
+ * i.e. "a.b" would return "a" and "b" if . was the delimeter
+ * @param delim
+ * @return
+ */
public SimpleString[] split(final char delim)
{
if (!contains(delim))
@@ -270,6 +293,12 @@
}
}
+ /**
+ * checks to see if this SimpleString contains the char parameter passed in
+ *
+ * @param c the char to check for
+ * @return true if the char is found, false otherwise.
+ */
public boolean contains(final char c)
{
for (int i = 0; i < data.length; i += 2)
@@ -284,11 +313,23 @@
return false;
}
+ /**
+ * concatanates a SimpleString and a String
+ *
+ * @param toAdd the String to concate with.
+ * @return the concatanated SimpleString
+ */
public SimpleString concat(final String toAdd)
{
return concat(new SimpleString(toAdd));
}
+ /**
+ * concatanates 2 SimpleString's
+ *
+ * @param toAdd the SimpleString to concate with.
+ * @return the concatanated SimpleString
+ */
public SimpleString concat(final SimpleString toAdd)
{
byte[] bytes = new byte[data.length + toAdd.getData().length];
@@ -297,6 +338,12 @@
return new SimpleString(bytes);
}
+ /**
+ * concatanates a SimpleString and a char
+ *
+ * @param toAdd the char to concate with.
+ * @return the concatanated SimpleString
+ */
public SimpleString concat(final char c)
{
byte[] bytes = new byte[data.length + 2];
@@ -306,16 +353,30 @@
return new SimpleString(bytes);
}
+ /**
+ * returns the size of this SimpleString
+ * @return the size
+ */
public int sizeof()
{
return DataConstants.SIZE_INT + data.length;
}
+ /**
+ * returns the size of a SimpleString
+ * @param str the SimpleString to check
+ * @return the size
+ */
public static int sizeofString(final SimpleString str)
{
return str.sizeof();
}
+ /**
+ * returns the size of a SimpleString which could be null
+ * @param str the SimpleString to check
+ * @return the size
+ */
public static int sizeofNullableString(final SimpleString str)
{
if (str == null)
@@ -328,6 +389,13 @@
}
}
+ /**
+ *
+ * @param srcBegin
+ * @param srcEnd
+ * @param dst
+ * @param dstBegin
+ */
public void getChars(final int srcBegin, final int srcEnd, final char dst[], final int dstBegin)
{
if (srcBegin < 0)
15 years
JBoss hornetq SVN: r8639 - in trunk/src/main/org/hornetq/core: client and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-09 08:57:51 -0500 (Wed, 09 Dec 2009)
New Revision: 8639
Modified:
trunk/src/main/org/hornetq/core/buffers/package-info.java
trunk/src/main/org/hornetq/core/client/ClientConsumer.java
trunk/src/main/org/hornetq/core/client/ClientRequestor.java
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
Log:
HORNETQ-186: fill in Javadocs for core API
* break javadoc paragraphs
Modified: trunk/src/main/org/hornetq/core/buffers/package-info.java
===================================================================
--- trunk/src/main/org/hornetq/core/buffers/package-info.java 2009-12-09 13:30:12 UTC (rev 8638)
+++ trunk/src/main/org/hornetq/core/buffers/package-info.java 2009-12-09 13:57:51 UTC (rev 8639)
@@ -13,10 +13,10 @@
/**
* HornetQ Buffering.
- *
+ * <br>
* This package defines the buffers used by HornetQ. The underlying implementations uses
* Netty's ChannelBuffer and wraps it with methods required by HornetQ usage.
- *
+ * <br>
* ChannelBuffer differs from {@link java.nio.ByteBuffer} in two ways:
* <ol>
* <li>it is possible to interface almost directly with byte arrays, what is faster</li>
Modified: trunk/src/main/org/hornetq/core/client/ClientConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientConsumer.java 2009-12-09 13:30:12 UTC (rev 8638)
+++ trunk/src/main/org/hornetq/core/client/ClientConsumer.java 2009-12-09 13:57:51 UTC (rev 8639)
@@ -17,11 +17,11 @@
/**
* A ClientConsumer receives messages from HornetQ queues.
- *
+ * <br>
* Messages can be consumed synchronously by using the <code>receive()</code> methods
* which will block until a message is received (or a timeout expires) or asynchronously
* by setting a {@link MessageHandler}.
- *
+ * <br>
* These 2 types of consumption are exclusive: a ClientConsumer with a MessageHandler set will
* throw HornetQException if its <code>receive()</code> methods are called.
*
Modified: trunk/src/main/org/hornetq/core/client/ClientRequestor.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientRequestor.java 2009-12-09 13:30:12 UTC (rev 8638)
+++ trunk/src/main/org/hornetq/core/client/ClientRequestor.java 2009-12-09 13:57:51 UTC (rev 8639)
@@ -19,7 +19,7 @@
/**
* The ClientRequestor class helps making requests.
- *
+ * <br>
* The ClientRequestor constructor is given a ClientSession and a request address.
* It creates a temporary queue for the responses and provides a request method that sends the request message and waits for its reply.
*
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-09 13:30:12 UTC (rev 8638)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-09 13:57:51 UTC (rev 8639)
@@ -22,7 +22,7 @@
/**
* A ClientSessionFactory is the entry point to create and configure HornetQ resources to produce and consume messages.
- *
+ * <br>
* It is possible to configure a factory using the setter methods only if no session has been created.
* Once a session is created, the configuration is fixed and any call to a setter method will throw a IllegalStateException.
*
15 years
JBoss hornetq SVN: r8638 - trunk/src/main/org/hornetq/core/config.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-09 08:30:12 -0500 (Wed, 09 Dec 2009)
New Revision: 8638
Modified:
trunk/src/main/org/hornetq/core/config/TransportConfiguration.java
Log:
ClientProducer javadocs
Modified: trunk/src/main/org/hornetq/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/TransportConfiguration.java 2009-12-09 11:48:23 UTC (rev 8637)
+++ trunk/src/main/org/hornetq/core/config/TransportConfiguration.java 2009-12-09 13:30:12 UTC (rev 8638)
@@ -20,10 +20,20 @@
import org.hornetq.utils.UUIDGenerator;
/**
- * A TransportConfiguration
- *
+ * A TransportConfiguration is used by a client to specify a connections to a server and its back up if one exists.<br><br>
+ * <p/>
+ * Typically the constructors take the classname and paramaters for needed to create the connection. These will be
+ * different dependant on what connector is beingused i.e. Netty or InVM etc. An example:<br><br>
+ * <p/>
+ * <code>
+ * HashMap<String, Object> map = new HashMap<String, Object>();<br>
+ * map.put("host", "localhost");<br>
+ * map.put("port", 5445);<br>
+ * TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName(), map); <br>
+ * ClientSessionFactory sf = new ClientSessionFactoryImpl(config); <br>
+ * </code><br><br>
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
*/
public class TransportConfiguration implements Serializable
{
@@ -43,6 +53,12 @@
private static final byte TYPE_STRING = 3;
+ /**
+ * Utility method for splitting a comma separated list of hosts
+ *
+ * @param commaSeparatedHosts the comma separated host string
+ * @return the hosts
+ */
public static String[] splitHosts(final String commaSeparatedHosts)
{
if (commaSeparatedHosts == null)
@@ -58,10 +74,21 @@
return hosts;
}
+ /**
+ * Creates a default TransportConfiguration with no configured transport.
+ */
public TransportConfiguration()
{
}
+ /**
+ * Creates a TransportConfiguration with a specific name providing the class name of the {@link org.hornetq.core.remoting.spi.ConnectorFactory}
+ * and any params needed.
+ *
+ * @param className The class name of the ConnectorFactory
+ * @param params The params needed by the ConnectorFactory
+ * @param name The name of this TransportConfiguration
+ */
public TransportConfiguration(final String className, final Map<String, Object> params, final String name)
{
factoryClassName = className;
@@ -71,26 +98,53 @@
this.name = name;
}
+ /**
+ * Creates a TransportConfiguration providing the class name of the {@link org.hornetq.core.remoting.spi.ConnectorFactory}
+ * and any params needed.
+ *
+ * @param className The class name of the ConnectorFactory
+ * @param params The params needed by the ConnectorFactory
+ */
public TransportConfiguration(final String className, final Map<String, Object> params)
{
this(className, params, UUIDGenerator.getInstance().generateStringUUID());
}
+ /**
+ * Creates a TransportConfiguration providing the class name of the {@link org.hornetq.core.remoting.spi.ConnectorFactory}
+ *
+ * @param className The class name of the ConnectorFactory
+ */
public TransportConfiguration(final String className)
{
this(className, new HashMap<String, Object>(), UUIDGenerator.getInstance().generateStringUUID());
}
+ /**
+ * Returns the name of this TransportConfiguration.
+ *
+ * @return the name
+ */
public String getName()
{
return name;
}
+ /**
+ * Returns the class name of ConnectorFactory being used by this TransportConfiguration
+ *
+ * @return The classname
+ */
public String getFactoryClassName()
{
return factoryClassName;
}
+ /**
+ * Returns any params set for this TransportConfiguration
+ *
+ * @return the params
+ */
public Map<String, Object> getParams()
{
return params;
@@ -110,7 +164,7 @@
return false;
}
- TransportConfiguration kother = (TransportConfiguration)other;
+ TransportConfiguration kother = (TransportConfiguration) other;
if (factoryClassName.equals(kother.factoryClassName))
{
@@ -184,6 +238,13 @@
return str.toString();
}
+ /**
+ * Encodes this TransportConfiguration into a buffer.
+ * <p/>
+ * Note that this is only used internally HornetQ.
+ *
+ * @param buffer the buffer to encode into
+ */
public void encode(final HornetQBuffer buffer)
{
buffer.writeString(name);
@@ -202,22 +263,22 @@
if (val instanceof Boolean)
{
buffer.writeByte(TransportConfiguration.TYPE_BOOLEAN);
- buffer.writeBoolean((Boolean)val);
+ buffer.writeBoolean((Boolean) val);
}
else if (val instanceof Integer)
{
buffer.writeByte(TransportConfiguration.TYPE_INT);
- buffer.writeInt((Integer)val);
+ buffer.writeInt((Integer) val);
}
else if (val instanceof Long)
{
buffer.writeByte(TransportConfiguration.TYPE_LONG);
- buffer.writeLong((Long)val);
+ buffer.writeLong((Long) val);
}
else if (val instanceof String)
{
buffer.writeByte(TransportConfiguration.TYPE_STRING);
- buffer.writeString((String)val);
+ buffer.writeString((String) val);
}
else
{
@@ -227,6 +288,13 @@
}
}
+ /**
+ * Decodes this TransportConfiguration from a buffer.
+ * <p/>
+ * Note this is only used internally by HornetQ
+ *
+ * @param buffer the biffer to decode from
+ */
public void decode(final HornetQBuffer buffer)
{
name = buffer.readString();
15 years