JBoss hornetq SVN: r10114 - in trunk/docs/eap-manual/en: diagrams and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-01-11 13:17:01 -0500 (Tue, 11 Jan 2011)
New Revision: 10114
Added:
trunk/docs/eap-manual/en/diagrams/
trunk/docs/eap-manual/en/diagrams/ha-topologies1.odg
trunk/docs/eap-manual/en/images/
trunk/docs/eap-manual/en/images/simple-colocated.jpg
trunk/docs/eap-manual/en/images/simple-colocated2.jpg
trunk/docs/eap-manual/en/images/simple-colocated3.jpg
trunk/docs/eap-manual/en/images/simple-dedicated-jca-remote.jpg
trunk/docs/eap-manual/en/images/simple-dedicated-jca.jpg
trunk/docs/eap-manual/en/images/simple-dedicated.jpg
Modified:
trunk/docs/eap-manual/en/clusters.xml
Log:
updated documentation
Modified: trunk/docs/eap-manual/en/clusters.xml
===================================================================
--- trunk/docs/eap-manual/en/clusters.xml 2011-01-10 22:52:30 UTC (rev 10113)
+++ trunk/docs/eap-manual/en/clusters.xml 2011-01-11 18:17:01 UTC (rev 10114)
@@ -39,18 +39,39 @@
EAP instances this would mean that each EAP instance would have a live server and 1 backup server as in
diagram1.
</para>
- <para>todo add image</para>
<para>
+ <graphic fileref="images/simple-colocated.jpg" align="center" format="jpg" scale="30"/>
+ </para>
+ <para>
+ Here the continuous lines show before failover and the dotted lines show the state of the cluster after
+ failover has occurred. To start with the 2 live servers are connected forming a cluster with each live server
+ connected to its local applications (via JCA). Also remote clients are connected to the live servers. After
+ failover the backup connects to the still available live server (which happens to be in the same vm) and takes
+ over as the live server in the cluster. Any remote clients also failover.
+ </para>
+ <para>
+ One thing to mention is that in that depending on what consumers/producers and MDB's etc are available messages
+ will be distributed between the nodes to make sure that all clients are satisfied from a JMS perspective. That is
+ if a producer is sending messages to a queue on a backup server that has no consumers, the messages will be
+ distributed to a live node elsewhere.
+ </para>
+ <para>
+ The following diagram is slightly more complex but shows the same configuration with 3 servers. Note that the
+ cluster connections ave been removed to make the configuration clearer but in reality all live servers will
+ form a cluster.
+ </para>
+ <para>
+ <graphic fileref="images/simple-colocated2.jpg" align="center" format="jpg" scale="30"/>
+ </para>
+ <para>
With more thn 2 servers it is up to the user as to how many backups per live server are configured, you can
have
as many backups as required but usually 1 would suffice. In 3 node topology you may have each EAP instance
configured
- with 2 backups, 1 for each of the other live servers, or you may just want to have 1 backup for each live.
+ with 2 backups in a 4 node 3 backups and so on. The following diagram demonstrates this.
</para>
<para>
- The reason for having the backup server colocated is so they work with MDB's, when a back up server comes ive
- it forwards any messages to the live server who deals with them in the normal fashion. If your application was
- pure JMS you could, if chosen, use a dedicated backup server.
+ <graphic fileref="images/simple-colocated3.jpg" align="center" format="jpg" scale="30"/>
</para>
<section>
<title>Configuration</title>
@@ -444,10 +465,100 @@
<section>
<title>Dedicated Live and Backup in Symmetrical cluster</title>
<para>
- In reality the configuration for this is exactly the same as in the previous section, the only difference is
- that a backup will reside on an eap instance of its own. of course this means that the eap instance is passive
- and not used until the backup comes live and is only really useful for pure JMS applications.
+ In reality the configuration for this is exactly the same as the backup server in the previous section, the only
+ difference is that a backup will reside on an eap instance of its own rather than colocated with another live server.
+ Of course this means that the eap instance is passive and not used until the backup comes live and is only
+ really useful for pure JMS applications.
</para>
+ <para>The following diagram shows a possible configuration for this:</para>
+ <para>
+ <graphic fileref="images/simple-dedicated.jpg" align="center" format="jpg" scale="30"/>
+ </para>
+ <para>
+ Here you can see how this works with remote JMS clients. Once failover occurs the HornetQ backup Server takes
+ running within another eap instance takes over as live.
+ </para>
+ <para>
+ This is fine with applications that are pure JMS and have no JMS components such as MDB's. If you are using
+ JMS components then there are 2 ways that this can be done. The first is shown in the following diagram:
+ </para>
+ <para>
+ <graphic fileref="images/simple-dedicated-jca.jpg" align="center" format="jpg" scale="30"/>
+ </para>
+ <para>
+ Because there is no live hornetq server running by default in the eap instance running the backup server it
+ makes no sense to host any applications in it. However you can host applications on the server running the live
+ hornetq server. If failure occurs to an live hornetq server then remote jms clients will failover as previously
+ explained however what happens to any messages meant for or sent from JEE components. Well when the backup comes
+ live, messages will be distributed to and from the backup server over HornetQ cluster connections and handled
+ appropriately.
+ </para>
+ <para>
+ The second way to do this is to have both live and backup server remote form the eap instance as shown in the
+ following diagram.
+ </para>
+ <para>
+ <graphic fileref="images/simple-dedicated-jca-remote.jpg" align="center" format="jpg" scale="30"/>
+ </para>
+ <para>
+ Here you can see that all the Application (via JCA) will be serviced by a HornetQ server in its own eap instance.
+ </para>
+ <section>
+ <title>Configuration of dedicated Live and backup</title>
+ <para>
+ The live server configuration is exactly the same as in the previous example. The only difference of course
+ is that there is no backup in the eap instance.
+ </para>
+ <para>
+ For the backup server the <literal>hornetq-configuration.xml</literal> is unchanged, however since there is
+ no live server we need to make sure that the <literal>hornetq-jboss-beans.xml</literal> instantiates all
+ the beans needed. For this simply use the same configuration as in the live server changing only the
+ location of the <literal>hornetq-configuration.xml</literal> parameter for the <literal>Configuration</literal>
+ bean.
+ </para>
+ <para>
+ As before there will be no <literal>hornetq-jms.xml</literal> or <literal>jms-ds.xml</literal> configuration.
+ </para>
+ <para>
+ If you want both hornetq servers to be in there own dedicated server where they are remote to applications,
+ as in the last diagram. Then simply edit the <literal>jms-ds.xml</literal> and change the following lines to
+ </para>
+ <programlisting>
+ <config-property name="ConnectorClassName" type="java.lang.String">org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property>
+ <config-property name="ConnectionParameters" type="java.lang.String">host=127.0.0.1;port=5446</config-property>
+ </programlisting>
+ <para>
+ This will change the outbound JCA connector, to configure the inbound connector for MDB's edit the
+ <literal>ra.xml</literal> config file and change the following parameters.
+ </para>
+ <programlisting>
+ <config-property>
+ <description>The transport type</description>
+ <config-property-name>ConnectorClassName</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property-value>
+ </config-property>
+ <config-property>
+ <description>The transport configuration. These values must be in the form of key=val;key=val;</description>
+ <config-property-name>ConnectionParameters</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>host=127.0.0.1;port=5446</config-property-value>
+ </config-property>
+ </programlisting>
+ <para>
+ In both cases the host and port should match your live server. If you are using Discovery then set the
+ appropriate parameters for <literal>DiscoveryAddress</literal> and <literal>DiscoveryPort</literal> to match
+ your configured broadcast groups.
+ </para>
+ </section>
+ <section>
+ <title>Running the shipped example</title>
+ <para>
+ EAP ships with an example configuration for this topology. Look under
+ <literal>extras/hornetq/resources/examples/cluster-with-dedicated-backup</literal>
+ and follow the read me
+ </para>
+ </section>
</section>
</section>
</chapter>
\ No newline at end of file
Added: trunk/docs/eap-manual/en/diagrams/ha-topologies1.odg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/eap-manual/en/diagrams/ha-topologies1.odg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/eap-manual/en/images/simple-colocated.jpg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/eap-manual/en/images/simple-colocated.jpg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/eap-manual/en/images/simple-colocated2.jpg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/eap-manual/en/images/simple-colocated2.jpg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/eap-manual/en/images/simple-colocated3.jpg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/eap-manual/en/images/simple-colocated3.jpg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/eap-manual/en/images/simple-dedicated-jca-remote.jpg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/eap-manual/en/images/simple-dedicated-jca-remote.jpg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/eap-manual/en/images/simple-dedicated-jca.jpg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/eap-manual/en/images/simple-dedicated-jca.jpg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/eap-manual/en/images/simple-dedicated.jpg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/eap-manual/en/images/simple-dedicated.jpg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
14 years
JBoss hornetq SVN: r10113 - in trunk/src/main/org/hornetq: utils and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-10 17:52:30 -0500 (Mon, 10 Jan 2011)
New Revision: 10113
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
Log:
fixing tests
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-10 20:30:22 UTC (rev 10112)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-10 22:52:30 UTC (rev 10113)
@@ -468,10 +468,11 @@
// We won't know the real size of the message since we are compressing while reading the streaming.
// This counter will be passed to the deflater to be updated for every byte read
+ AtomicLong messageSize = new AtomicLong();
if (session.isCompressLargeMessages())
{
- input = new DeflaterReader(inputStreamParameter);
+ input = new DeflaterReader(inputStreamParameter, messageSize);
}
long totalSize = 0;
@@ -517,13 +518,18 @@
if (lastPacket)
{
+ if (!session.isCompressLargeMessages())
+ {
+ messageSize.set(totalSize);
+ }
+
byte[] buff2 = new byte[pos];
System.arraycopy(buff, 0, buff2, 0, pos);
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, totalSize);
+ chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
}
else
{
Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-10 20:30:22 UTC (rev 10112)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-10 22:52:30 UTC (rev 10113)
@@ -41,11 +41,6 @@
input = inData;
this.bytesRead = bytesRead;
}
-
- public DeflaterReader(final InputStream inData)
- {
- this(inData, null);
- }
@Override
public int read() throws IOException
14 years
JBoss hornetq SVN: r10112 - in trunk: src/main/org/hornetq/core/client/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-10 15:30:22 -0500 (Mon, 10 Jan 2011)
New Revision: 10112
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
Log:
JBOAOO-5595 - Fixing large message send on file-size > 32bits integers
Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-10 20:30:22 UTC (rev 10112)
@@ -49,10 +49,8 @@
// This may take some considerable time to create, send and consume - if it takes too long or you don't have
// enough disk space just reduce the file size here
- private final long FILE_SIZE = 256L * 1024 * 1024;
+ private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 10 GiB message
- //private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
-
@Override
public boolean runExample() throws Exception
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-01-10 20:30:22 UTC (rev 10112)
@@ -468,14 +468,13 @@
// We won't know the real size of the message since we are compressing while reading the streaming.
// This counter will be passed to the deflater to be updated for every byte read
- AtomicLong messageSize = new AtomicLong();
if (session.isCompressLargeMessages())
{
- input = new DeflaterReader(inputStreamParameter, messageSize);
+ input = new DeflaterReader(inputStreamParameter);
}
- int totalSize = 0;
+ long totalSize = 0;
while (!lastPacket)
{
@@ -518,18 +517,13 @@
if (lastPacket)
{
- if (!session.isCompressLargeMessages())
- {
- messageSize.set(totalSize);
- }
-
byte[] buff2 = new byte[pos];
System.arraycopy(buff, 0, buff2, 0, pos);
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+ chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, totalSize);
}
else
{
Modified: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-08 04:28:52 UTC (rev 10111)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2011-01-10 20:30:22 UTC (rev 10112)
@@ -41,6 +41,11 @@
input = inData;
this.bytesRead = bytesRead;
}
+
+ public DeflaterReader(final InputStream inData)
+ {
+ this(inData, null);
+ }
@Override
public int read() throws IOException
@@ -103,7 +108,10 @@
}
else
{
- bytesRead.addAndGet(m);
+ if (bytesRead != null)
+ {
+ bytesRead.addAndGet(m);
+ }
deflater.setInput(readBuffer, 0, m);
}
}
14 years
JBoss hornetq SVN: r10111 - in trunk: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-07 23:28:52 -0500 (Fri, 07 Jan 2011)
New Revision: 10111
Modified:
trunk/build-maven.xml
trunk/hornetq-rest/pom.xml
Log:
upload QA release
Modified: trunk/build-maven.xml
===================================================================
--- trunk/build-maven.xml 2011-01-07 16:58:16 UTC (rev 10110)
+++ trunk/build-maven.xml 2011-01-08 04:28:52 UTC (rev 10111)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.QA-10090"/>
+ <property name="hornetq.version" value="2.2.0.QA-10111"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: trunk/hornetq-rest/pom.xml
===================================================================
--- trunk/hornetq-rest/pom.xml 2011-01-07 16:58:16 UTC (rev 10110)
+++ trunk/hornetq-rest/pom.xml 2011-01-08 04:28:52 UTC (rev 10111)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.0.QA-10090</hornetq.version>
+ <hornetq.version>2.2.0.QA-10111</hornetq.version>
</properties>
<licenses>
14 years
JBoss hornetq SVN: r10110 - trunk/docs/eap-manual/en.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-01-07 11:58:16 -0500 (Fri, 07 Jan 2011)
New Revision: 10110
Modified:
trunk/docs/eap-manual/en/clusters.xml
Log:
updated documentation
Modified: trunk/docs/eap-manual/en/clusters.xml
===================================================================
--- trunk/docs/eap-manual/en/clusters.xml 2011-01-07 16:49:22 UTC (rev 10109)
+++ trunk/docs/eap-manual/en/clusters.xml 2011-01-07 16:58:16 UTC (rev 10110)
@@ -54,329 +54,391 @@
</para>
<section>
<title>Configuration</title>
- <para>
- First lets start with the configuration of the live server, we will use the EAP 'all' configuration as
- our starting point. Since this version only supports shared store for failover we need to configure this in the
- <literal>hornetq-configuration.xml</literal>
- file like so:
- </para>
- <programlisting>
- <shared-store>true</shared-store>
- </programlisting>
- <para>
- Obviously this means that the location of the journal files etc will have to be configured to be some
- where
- where
- this lives backup can access. You may change the lives configuration in
- <literal>hornetq-configuration.xml</literal>
- to
- something like:
- </para>
- <programlisting>
- <large-messages-directory>/media/shared/data/large-messages</large-messages-directory>
- <bindings-directory>/media/shared/data/bindings</bindings-directory>
- <journal-directory>/media/shared/data/journal</journal-directory>
- <paging-directory>/media/shared/data/paging</paging-directory>
- </programlisting>
- <para>
- How these paths are configured will of course depend on your network settings or file system.
- </para>
- <para>
- Now we need to configure how remote JMS clients will behave if the server is shutdown in a normal
- fashion.
- By
- default
- Clients will not failover if the live server is shutdown. Depending on there connection factory settings
- they will either fail or try to reconnect to the live server.
- </para>
- <para>If you want clients to failover on a normal server shutdown the you must configure the
- <literal>failover-on-shutdown</literal>
- flag to true in the
- <literal>hornetq-configuration.xml</literal>
- file like so:
- </para>
- <programlisting>
- <failover-on-shutdown>false</failover-on-shutdown>
- </programlisting>
- <para>Don't worry if you have this set to false (which is the default) but still want failover to occur,
- simply
- kill
- the
- server process directly or call
- <literal>forceFailover</literal>
- via jmx or the admin console on the core server object.
- </para>
- <para>
- No lets look at how to create and configure a backup server on the same node, lets assume that this
- backups
- live
- server is configured identically to the live server on this node for simplicities sake.
- </para>
- <para>
- Firstly we need to define a new HornetQ Server that EAP will deploy. We do this by creating a new
- <literal>hornetq-jboss-beans.xml</literal>
- configuration. We will place this under a new directory
- <literal>hornetq-backup1</literal>
- which will need creating
- in the
- <literal>deploy</literal>
- directory but in reality it doesn't matter where this is put. This will look like:
- </para>
- <programlisting>
- <?xml version="1.0" encoding="UTF-8"?>
+ <section>
+ <title>Live Server Configuration</title>
+ <para>
+ First lets start with the configuration of the live server, we will use the EAP 'all' configuration as
+ our starting point. Since this version only supports shared store for failover we need to configure
+ this in the
+ <literal>hornetq-configuration.xml</literal>
+ file like so:
+ </para>
+ <programlisting>
+ <shared-store>true</shared-store>
+ </programlisting>
+ <para>
+ Obviously this means that the location of the journal files etc will have to be configured to be some
+ where
+ where
+ this lives backup can access. You may change the lives configuration in
+ <literal>hornetq-configuration.xml</literal>
+ to
+ something like:
+ </para>
+ <programlisting>
+ <large-messages-directory>/media/shared/data/large-messages</large-messages-directory>
+ <bindings-directory>/media/shared/data/bindings</bindings-directory>
+ <journal-directory>/media/shared/data/journal</journal-directory>
+ <paging-directory>/media/shared/data/paging</paging-directory>
+ </programlisting>
+ <para>
+ How these paths are configured will of course depend on your network settings or file system.
+ </para>
+ <para>
+ Now we need to configure how remote JMS clients will behave if the server is shutdown in a normal
+ fashion.
+ By
+ default
+ Clients will not failover if the live server is shutdown. Depending on there connection factory
+ settings
+ they will either fail or try to reconnect to the live server.
+ </para>
+ <para>If you want clients to failover on a normal server shutdown the you must configure the
+ <literal>failover-on-shutdown</literal>
+ flag to true in the
+ <literal>hornetq-configuration.xml</literal>
+ file like so:
+ </para>
+ <programlisting>
+ <failover-on-shutdown>false</failover-on-shutdown>
+ </programlisting>
+ <para>Don't worry if you have this set to false (which is the default) but still want failover to occur,
+ simply
+ kill
+ the
+ server process directly or call
+ <literal>forceFailover</literal>
+ via jmx or the admin console on the core server object.
+ </para>
+ <para>We also need to configure the connection factories used by the client to be HA. This is done by
+ adding
+ certain attributes to the connection factories in<literal>hornetq-jms.xml</literal>. Lets look at an
+ example:
+ </para>
+ <programlisting>
+ <connection-factory name="NettyConnectionFactory">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/ConnectionFactory"/>
+ <entry name="/XAConnectionFactory"/>
+ </entries>
- <deployment xmlns="urn:jboss:bean-deployer:2.0">
+ <ha>true</ha>
+ <!-- Pause 1 second between connect attempts -->
+ <retry-interval>1000</retry-interval>
- <!-- The core configuration -->
- <bean name="BackupConfiguration" class="org.hornetq.core.config.impl.FileConfiguration">
- <property name="configurationUrl">${jboss.server.home.url}/deploy/hornetq-backup1/hornetq-configuration.xml</property>
- </bean>
+ <!-- Multiply subsequent reconnect pauses by this multiplier. This can be used to
+ implement an exponential back-off. For our purposes we just set to 1.0 so each reconnect
+ pause is the same length -->
+ <retry-interval-multiplier>1.0</retry-interval-multiplier>
+ <!-- Try reconnecting an unlimited number of times (-1 means "unlimited") -->
+ <reconnect-attempts>-1</reconnect-attempts>
+ </connection-factory>
- <!-- The core server -->
- <bean name="BackupHornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
- <constructor>
- <parameter>
- <inject bean="BackupConfiguration"/>
- </parameter>
- <parameter>
- <inject bean="MBeanServer"/>
- </parameter>
- <parameter>
- <inject bean="HornetQSecurityManager"/>
- </parameter>
- </constructor>
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
+ </programlisting>
+ <para>We have added the following attributes to the connection factory used by the client:</para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>ha</literal>
+ - This tells the client it support HA and must always be true for failover
+ to occur
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>retry-interval</literal>
+ - this is how long the client will wait after each unsuccessful
+ reconnect to the server
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>retry-interval-multiplier</literal>
+ - is used to configure an exponential back off for
+ reconnect attempts
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>reconnect-attempts</literal>
+ - how many reconnect attempts should a client make before failing,
+ -1 means unlimited.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </section>
+ <section>
+ <title>Backup Server Configuration</title>
+ <para>
+ Now lets look at how to create and configure a backup server on the same eap instance. This is running
+ on the same eap instance as the live server from the previous chapter but is configured as the backup
+ for a live server running on a different eap instance.
+ </para>
+ <para>
+ The first thing to mention is that the backup only needs a <literal>hornetq-jboss-beans.xml</literal>
+ and a <literal>hornetq-configuration.xml</literal> configuration file. This is because any JMS components
+ are created from the Journal when the backup server becomes live.
+ </para>
+ <para>
+ Firstly we need to define a new HornetQ Server that EAP will deploy. We do this by creating a new
+ <literal>hornetq-jboss-beans.xml</literal>
+ configuration. We will place this under a new directory
+ <literal>hornetq-backup1</literal>
+ which will need creating
+ in the
+ <literal>deploy</literal>
+ directory but in reality it doesn't matter where this is put. This will look like:
+ </para>
+ <programlisting>
+ <?xml version="1.0" encoding="UTF-8"?>
- <!-- The JMS server -->
- <bean name="BackupJMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
- <constructor>
- <parameter>
- <inject bean="BackupHornetQServer"/>
- </parameter>
- </constructor>
- </bean>
+ <deployment xmlns="urn:jboss:bean-deployer:2.0">
- </deployment>
- </programlisting>
- <para>
- The first thing to notice is the BackupConfiguration bean. This is configured to pick up the
- configuration
- for
- the
- server which we will place in the same directory.
- </para>
- <para>
- After that we just configure a new HornetQ Server and JMS server.
- </para>
- <note>
+ <!-- The core configuration -->
+ <bean name="BackupConfiguration" class="org.hornetq.core.config.impl.FileConfiguration">
+ <property
+ name="configurationUrl">${jboss.server.home.url}/deploy/hornetq-backup1/hornetq-configuration.xml</property>
+ </bean>
+
+
+ <!-- The core server -->
+ <bean name="BackupHornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="BackupConfiguration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="BackupJMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="BackupHornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+ </deployment>
+ </programlisting>
<para>
- Notice that the names of the beans have been changed from that of the live servers configuration. This
- is
- so
- there is no clash. Obviously if you add more backup servers you will need to rename those as well,
- backup1,
- backup2 etc.
+ The first thing to notice is the BackupConfiguration bean. This is configured to pick up the
+ configuration
+ for
+ the
+ server which we will place in the same directory.
</para>
- </note>
- <para>
- Now lets add the server configuration in
- <literal>hornetq-configuration.xml</literal>
- and add it to the same directory
- <literal>deploy/hornetq-backup1</literal>
- and configure it like so:
- </para>
- <programlisting>
- <configuration xmlns="urn:hornetq"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <para>
+ After that we just configure a new HornetQ Server and JMS server.
+ </para>
+ <note>
+ <para>
+ Notice that the names of the beans have been changed from that of the live servers configuration.
+ This
+ is
+ so
+ there is no clash. Obviously if you add more backup servers you will need to rename those as well,
+ backup1,
+ backup2 etc.
+ </para>
+ </note>
+ <para>
+ Now lets add the server configuration in
+ <literal>hornetq-configuration.xml</literal>
+ and add it to the same directory
+ <literal>deploy/hornetq-backup1</literal>
+ and configure it like so:
+ </para>
+ <programlisting>
+ <configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
- <jmx-domain>org.hornetq.backup1</jmx-domain>
+ <jmx-domain>org.hornetq.backup1</jmx-domain>
- <clustered>true</clustered>
+ <clustered>true</clustered>
- <backup>true</backup>
+ <backup>true</backup>
- <shared-store>true</shared-store>
+ <shared-store>true</shared-store>
- <allow-failback>true</allow-failback>
+ <allow-failback>true</allow-failback>
- <log-delegate-factory-class-name>org.hornetq.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>
+ <log-delegate-factory-class-name>org.hornetq.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>
- <bindings-directory>${jboss.server.data.dir}/hornetq-backup/bindings</bindings-directory>
+ <bindings-directory>/media/shared/data/hornetq-backup/bindings</bindings-directory>
- <journal-directory>${jboss.server.data.dir}/hornetq-backup/journal</journal-directory>
+ <journal-directory>/media/shared/data/hornetq-backup/journal</journal-directory>
- <journal-min-files>10</journal-min-files>
+ <journal-min-files>10</journal-min-files>
- <large-messages-directory>${jboss.server.data.dir}/hornetq-backup/largemessages</large-messages-directory>
+ <large-messages-directory>/media/shared/data/hornetq-backup/largemessages</large-messages-directory>
- <paging-directory>${jboss.server.data.dir}/hornetq/paging</paging-directory>
+ <paging-directory>/media/shared/data/hornetq-backup/paging</paging-directory>
- <connectors>
- <connector name="netty-connector">
- <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
- <param key="host" value="${jboss.bind.address:localhost}"/>
- <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
- </connector>
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
+ </connector>
- <!--The connetor to the live node that corresponds to this backup-->
- <connector name="my-live-connector">
- <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
- <param key="host" value="my-live-host"/>
- <param key="port" value="${hornetq.remoting.netty.port:5445}"/>
- </connector>
+ <connector name="in-vm">
+ <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
+ <param key="server-id" value="${hornetq.server-id:0}"/>
+ </connector>
- <!--invm connector added by th elive server on this node, used by the bridges-->
- <connector name="in-vm">
- <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
- <param key="server-id" value="${hornetq.server-id:0}"/>
- </connector>
+ </connectors>
- </connectors>
+ <acceptors>
+ <acceptor name="netty">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
+ </acceptor>
+ </acceptors>
- <acceptors>
- <acceptor name="netty">
- <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
- <param key="host" value="${jboss.bind.address:localhost}"/>
- <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
- </acceptor>
- </acceptors>
+ <broadcast-groups>
+ <broadcast-group name="bg-group1">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>1000</broadcast-period>
+ <connector-ref>netty-connector</connector-ref>
+ </broadcast-group>
+ </broadcast-groups>
- <broadcast-groups>
- <broadcast-group name="bg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <broadcast-period>1000</broadcast-period>
- <connector-ref>netty-connector</connector-ref>
- </broadcast-group>
- </broadcast-groups>
+ <discovery-groups>
+ <discovery-group name="dg-group1">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>60000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
- <discovery-groups>
- <discovery-group name="dg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
- <refresh-timeout>60000</refresh-timeout>
- </discovery-group>
- </discovery-groups>
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <connector-ref>netty-connector</connector-ref>
+ <discovery-group-ref discovery-group-name="dg-group1"/>
+ </cluster-connection>
+ </cluster-connections>
- <cluster-connections>
- <cluster-connection name="my-cluster">
- <address>jms</address>
- <connector-ref>netty-connector</connector-ref>
- <discovery-group-ref discovery-group-name="dg-group1"/>
- </cluster-connection>
- </cluster-connections>
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
- <!-- We need to create a core queue for the JMS queue explicitly because the bridge will be deployed
- before the JMS queue is deployed, so the first time, it otherwise won't find the queue -->
- <queues>
- <queue name="jms.queue.testQueue">
- <address>jms.queue.testQueue</address>
- </queue>
- </queues>
- <!-- We set-up a bridge that forwards from a the queue on this node to the same address on the live
- node.
- -->
- <bridges>
- <bridge name="testQueueBridge">
- <queue-name>jms.queue.testQueue</queue-name>
- <forwarding-address>jms.queue.testQueue</forwarding-address>
- <reconnect-attempts>-1</reconnect-attempts>
- <static-connectors>
- <connector-ref>in-vm</connector-ref>
- </static-connectors>
- </bridge>
- </bridges>
+ <address-settings>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>jms.queue.DLQ</dead-letter-address>
+ <expiry-address>jms.queue.ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>10485760</max-size-bytes>
+ <message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
+ </address-setting>
+ </address-settings>
- <security-settings>
- <security-setting match="#">
- <permission type="createNonDurableQueue" roles="guest"/>
- <permission type="deleteNonDurableQueue" roles="guest"/>
- <permission type="consume" roles="guest"/>
- <permission type="send" roles="guest"/>
- </security-setting>
- </security-settings>
+ </configuration>
- <address-settings>
- <!--default for catch all-->
- <address-setting match="#">
- <dead-letter-address>jms.queue.DLQ</dead-letter-address>
- <expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
- <max-size-bytes>10485760</max-size-bytes>
- <message-counter-history-day-limit>10</message-counter-history-day-limit>
- <address-full-policy>BLOCK</address-full-policy>
- </address-setting>
- </address-settings>
-
- </configuration>
-
- </programlisting>
- <para>
- The first thing you can see is we have added a <literal>jmx-domain</literal> attribute, this is used when
- adding objects, such as the HornetQ server and JMS server to jmx, we change this from the default <literal>org.hornetq</literal>
- to avoid naming clashes with the live server
- </para>
- <para>
- The first important part of the configuration is to make sure that this server starts as a backup server not
- a live server, via the <literal>backup</literal> attribute.
- </para>
- <para>
- After that we have the same cluster configuration as live, that is <literal>clustered</literal> is true and
- <literal>shared-store</literal> is true. However you can see we have added a new configuration element
- <literal>allow-failback</literal>. When this is set to true then this backup server will automatically stop
- and fall back into backup node if failover occurs and the live server has become available. If false then
- the user will have to stop the server manually.
- </para>
- <para>
- Next we can see the configuration for the journal location, as in the live configuration this must point to
- the same directory as this backup's live server.
- </para>
- <para>
- Now we see the connectors configuration, we have 3 defined which are needed for the following
- </para>
- <itemizedlist>
- <listitem>
- <para>
- <literal>netty-connector.</literal> This is the connector used to connect to this backup server once live.
- </para>
- </listitem>
- <listitem>
- <para>
- <literal>my-live-connector.</literal> This is the connector to the live server that this backup is paied to.
- It is used by the cluster connection to announce its presence as a backup and to form the cluster when
- this backup becomes live. In reality it doesn't matter what connector the cluster connection uses, it
- could actually use the invm connector and broadcast its presence via the server on this node if we wanted.
- </para>
- </listitem>
- <listitem>
- <para>
- <literal>in-vm.</literal> This is the invm connector that is created by the live server on the same
- node. We will use this to create a bridge to the live server to forward messages to.
- </para>
- </listitem>
- </itemizedlist>
- <para>After that you will see the acceptors defined, This is the acceptor where clients will reconnect.</para>
- <para>
- The Broadcast groups, Discovery group and cluster configurations are as per normal, details of these
- can be found in the HornetQ user manual.
- </para>
- <para>
- The next part is of interest, here we define a list of queues and bridges. These must match any queues
- and addresses used by MDB's in the live servers configuration. At this point these must be statically
- defined but this may change in future versions. Basically fow every queue or topic definition you need a
- queue configuration using the correct prefix <literal>jms.queue(topic)</literal> if using jm and a bridge
- definition that handles the forwarding of any message.
- </para>
- <note>
+ </programlisting>
<para>
- There is no such thing as a topic in core HornetQ, this is basically just an address so we need to create
- a queue that matches the jms address, that is, <literal>jms.topic.testTopic</literal>.
+ The first thing you can see is we have added a
+ <literal>jmx-domain</literal>
+ attribute, this is used when
+ adding objects, such as the HornetQ server and JMS server to jmx, we change this from the default
+ <literal>org.hornetq</literal>
+ to avoid naming clashes with the live server
</para>
- </note>
+ <para>
+ The first important part of the configuration is to make sure that this server starts as a backup
+ server not
+ a live server, via the
+ <literal>backup</literal>
+ attribute.
+ </para>
+ <para>
+ After that we have the same cluster configuration as live, that is
+ <literal>clustered</literal>
+ is true and
+ <literal>shared-store</literal>
+ is true. However you can see we have added a new configuration element
+ <literal>allow-failback</literal>. When this is set to true then this backup server will automatically
+ stop
+ and fall back into backup node if failover occurs and the live server has become available. If false
+ then
+ the user will have to stop the server manually.
+ </para>
+ <para>
+ Next we can see the configuration for the journal location, as in the live configuration this must
+ point to
+ the same directory as this backup's live server.
+ </para>
+ <para>
+ Now we see the connectors configuration, we have 3 defined which are needed for the following
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>netty-connector.</literal>
+ This is the connector used to connect to this backup server once live.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>After that you will see the acceptors defined, This is the acceptor where clients will reconnect.
+ </para>
+ <para>
+ The Broadcast groups, Discovery group and cluster configurations are as per normal, details of these
+ can be found in the HornetQ user manual.
+ </para>
+ <para>
+ When the backup becomes it will be not be servicing any JEE components on this eap instance. Instead any
+ existing messages will be redistributed around the cluster and new messages forwarded to and from the backup
+ to service any remote clients it has (if it has any).
+ </para>
+ </section>
+ <section>
+ <title>Configuring multiple backups</title>
+ <para>
+ In this instance we have assumed that there are only 2 nodes where each node has a backup for the other
+ node. However you may want to configure a server too have multiple backup nodes. For example you may want
+ 3 nodes where each node has 2 backups, one for each of the other 2 live servers. For this you would simply
+ copy the backup configuration and make sure you do the following:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ Make sure that you give all the beans in the <literal>hornetq-jboss-beans.xml</literal> configuration
+ file a unique name, i.e.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </section>
+ <section>
+ <title>Running the shipped example</title>
+ <para>
+ EAP ships with an example configuration for this topology. Look under <literal>extras/hornetq/resources/examples/symmetric-cluster-with-backups-colocated</literal>
+ and follow the read me
+ </para>
+ </section>
</section>
</section>
<section>
14 years
JBoss hornetq SVN: r10109 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-01-07 11:49:22 -0500 (Fri, 07 Jan 2011)
New Revision: 10109
Modified:
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
added tostring for extra debug to configuration
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-01-07 15:57:35 UTC (rev 10108)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-01-07 16:49:22 UTC (rev 10109)
@@ -1379,4 +1379,18 @@
this.name = name;
}
+ @Override
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer("HornetQ Configuration (");
+ sb.append("clustered=").append(clustered).append(",");
+ sb.append("backup=").append(backup).append(",");
+ sb.append("sharedStore=").append(sharedStore).append(",");
+ sb.append("journalDirectory=").append(journalDirectory).append(",");
+ sb.append("bindingsDirectory=").append(bindingsDirectory).append(",");
+ sb.append("largeMessagesDirectory=").append(largeMessagesDirectory).append(",");
+ sb.append("pagingDirectory=").append(pagingDirectory);
+ sb.append(")");
+ return sb.toString();
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-01-07 15:57:35 UTC (rev 10108)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-01-07 16:49:22 UTC (rev 10109)
@@ -15,6 +15,7 @@
import java.io.File;
import java.lang.management.ManagementFactory;
+import java.nio.channels.ClosedChannelException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -427,6 +428,10 @@
{
//this is ok, we are being stopped
}
+ catch (ClosedChannelException e)
+ {
+ //this is ok too, we are being stopped
+ }
catch (Exception e)
{
if(!(e.getCause() instanceof InterruptedException))
@@ -522,7 +527,7 @@
return;
}
- HornetQServerImpl.log.info((configuration.isBackup() ? "backup" : "live") + " server is starting..");
+ HornetQServerImpl.log.info((configuration.isBackup() ? "backup" : "live") + " server is starting with configuration " + configuration);
if (configuration.isRunSyncSpeedTest())
{
@@ -530,7 +535,7 @@
test.run();
}
-
+
if (!configuration.isBackup())
{
if (configuration.isSharedStore())
14 years
JBoss hornetq SVN: r10108 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-01-07 10:57:35 -0500 (Fri, 07 Jan 2011)
New Revision: 10108
Modified:
trunk/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
Log:
fix for https://issues.jboss.org/browse/JBPAPP-5709
Modified: trunk/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-01-06 15:55:56 UTC (rev 10107)
+++ trunk/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-01-07 15:57:35 UTC (rev 10108)
@@ -20,6 +20,7 @@
import org.hornetq.utils.UUIDGenerator;
import java.io.File;
+import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -52,6 +53,8 @@
private static final byte NOT_STARTED = 'N';
+ private static final byte FIRST_TIME_START = '0';
+
private FileChannel channel;
private FileLock liveLock;
@@ -74,15 +77,34 @@
}
File file = new File(directory, SERVER_LOCK_NAME);
+ boolean fileCreated = false;
+
if (!file.exists())
{
- file.createNewFile();
+ fileCreated = file.createNewFile();
+ if(!fileCreated)
+ {
+ throw new IllegalStateException("Unable to create server lock file");
+ }
}
RandomAccessFile raFile = new RandomAccessFile(file, ACCESS_MODE);
channel = raFile.getChannel();
+ if (fileCreated)
+ {
+ ByteBuffer id = ByteBuffer.allocateDirect(3);
+ byte[] bytes = new byte[3];
+ bytes[0] = FIRST_TIME_START;
+ bytes[1] = FIRST_TIME_START;
+ bytes[2] = FIRST_TIME_START;
+ id.put(bytes, 0, 3);
+ id.position(0);
+ channel.write(id, 0);
+ channel.force(true);
+ }
+
createNodeId();
super.start();
@@ -127,25 +149,25 @@
do
{
byte state = getState();
- while (state == NOT_STARTED || state == 0)
+ while (state == NOT_STARTED || state == FIRST_TIME_START)
{
- log.info("awaiting live node startup state='" + state + "'");
+ log.debug("awaiting live node startup state='" + state + "'");
Thread.sleep(2000);
state = getState();
}
- liveLock = channel.lock(LIVE_LOCK_POS, 1, false);
+ liveLock = lock(LIVE_LOCK_POS, 1);
state = getState();
if (state == PAUSED)
{
liveLock.release();
- log.info("awaiting live node restarting");
+ log.debug("awaiting live node restarting");
Thread.sleep(2000);
}
else if (state == FAILINGBACK)
{
liveLock.release();
- log.info("awaiting live node failing back");
+ log.debug("awaiting live node failing back");
Thread.sleep(2000);
}
else if (state == LIVE)
@@ -161,7 +183,7 @@
log.info("Waiting to become backup node");
- backupLock = channel.lock(BACKUP_LOCK_POS, LOCK_LENGTH, false);
+ backupLock = lock(BACKUP_LOCK_POS, LOCK_LENGTH);
log.info("** got backup lock");
readNodeId();
@@ -173,7 +195,7 @@
log.info("Waiting to obtain live lock");
- liveLock = channel.lock(LIVE_LOCK_POS, LOCK_LENGTH, false);
+ liveLock = lock(LIVE_LOCK_POS, LOCK_LENGTH);
log.info("Live Server Obtained live lock");
@@ -286,5 +308,35 @@
nodeID = new SimpleString(uuid.toString());
}
}
+
+ private FileLock lock(int liveLockPos, int i) throws IOException
+ {
+ try
+ {
+ return channel.lock(liveLockPos, i, false);
+ }
+ catch (IOException e)
+ {
+ //todo this is here because sometimes channel.lock throws a resource deadlock exception but trylock works, need to investigate further and review
+ FileLock lock;
+ do
+ {
+ lock = channel.tryLock(liveLockPos, i, false);
+ if (lock == null)
+ {
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e1)
+ {
+ //
+ }
+ }
+ }
+ while(lock == null);
+ return lock;
+ }
+ }
}
14 years
JBoss hornetq SVN: r10107 - in trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest: topic and 1 other directory.
by do-not-reply@jboss.org
Author: bill.burke(a)jboss.com
Date: 2011-01-06 10:55:56 -0500 (Thu, 06 Jan 2011)
New Revision: 10107
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicResource.java
Log:
add atom link to queue/topic media type for all interactions
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueResource.java 2011-01-05 16:30:39 UTC (rev 10106)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueResource.java 2011-01-06 15:55:56 UTC (rev 10107)
@@ -36,12 +36,19 @@
public Response get(@Context UriInfo uriInfo)
{
+ StringBuilder msg = new StringBuilder();
+ msg.append("<queue>")
+ .append("<name>").append(destination).append("</name>")
+ .append("<atom:link rel=\"create\" href=\"").append(createSenderLink(uriInfo)).append("\"/>")
+ .append("<atom:link rel=\"create-with-id\" href=\"").append(createSenderWithIdLink(uriInfo)).append("\"/>")
+ .append("<atom:link rel=\"pull-consumers\" href=\"").append(createConsumersLink(uriInfo)).append("\"/>")
+ .append("<atom:link rel=\"push-consumers\" href=\"").append(createPushConsumersLink(uriInfo)).append("\"/>")
- String msg = "<queue>"
- + "<name>" + destination + "</name>"
- + "</queue>";
- Response.ResponseBuilder builder = Response.ok(msg);
+ .append("</queue>");
+
+ Response.ResponseBuilder builder = Response.ok(msg.toString());
setSenderLink(builder, uriInfo);
+ setSenderWithIdLink(builder, uriInfo);
setConsumersLink(builder, uriInfo);
setPushConsumersLink(builder, uriInfo);
return builder.build();
@@ -61,35 +68,59 @@
protected void setSenderLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createSenderLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "create", "create", uri, null);
+ }
+
+ protected String createSenderLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("create");
String uri = builder.build().toString();
- serviceManager.getLinkStrategy().setLinkHeader(response, "create", "create", uri, null);
+ return uri;
}
protected void setSenderWithIdLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createSenderWithIdLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "create-with-id", "create-with-id", uri, null);
+ }
+
+ protected String createSenderWithIdLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("create");
String uri = builder.build().toString();
uri += "/{id}";
- serviceManager.getLinkStrategy().setLinkHeader(response, "create-with-id", "create-with-id", uri, null);
+ return uri;
}
protected void setConsumersLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createConsumersLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "pull-consumers", "pull-consumers", uri, null);
+ }
+
+ protected String createConsumersLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("pull-consumers");
String uri = builder.build().toString();
- serviceManager.getLinkStrategy().setLinkHeader(response, "pull-consumers", "pull-consumers", uri, null);
+ return uri;
}
protected void setPushConsumersLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createPushConsumersLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "push-consumers", "push-consumers", uri, null);
+ }
+
+ protected String createPushConsumersLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("push-consumers");
String uri = builder.build().toString();
- serviceManager.getLinkStrategy().setLinkHeader(response, "push-consumers", "push-consumers", uri, null);
+ return uri;
}
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicResource.java 2011-01-05 16:30:39 UTC (rev 10106)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/TopicResource.java 2011-01-06 15:55:56 UTC (rev 10107)
@@ -37,12 +37,19 @@
public Response get(@Context UriInfo uriInfo)
{
+ StringBuilder msg = new StringBuilder();
+ msg.append("<topic>")
+ .append("<name>").append(destination).append("</name>")
+ .append("<atom:link rel=\"create\" href=\"").append(createSenderLink(uriInfo)).append("\"/>")
+ .append("<atom:link rel=\"create-with-id\" href=\"").append(createSenderWithIdLink(uriInfo)).append("\"/>")
+ .append("<atom:link rel=\"pull-consumers\" href=\"").append(createSubscriptionsLink(uriInfo)).append("\"/>")
+ .append("<atom:link rel=\"push-consumers\" href=\"").append(createPushSubscriptionsLink(uriInfo)).append("\"/>")
- String msg = "<topic>"
- + "<name>" + destination + "</name>"
- + "</topic>";
- Response.ResponseBuilder builder = Response.ok(msg);
+ .append("</topic>");
+
+ Response.ResponseBuilder builder = Response.ok(msg.toString());
setSenderLink(builder, uriInfo);
+ setSenderWithIdLink(builder, uriInfo);
setSubscriptionsLink(builder, uriInfo);
setPushSubscriptionsLink(builder, uriInfo);
return builder.build();
@@ -62,35 +69,59 @@
protected void setSenderLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createSenderLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "create", "create", uri, null);
+ }
+
+ protected String createSenderLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("create");
String uri = builder.build().toString();
- serviceManager.getLinkStrategy().setLinkHeader(response, "create", "create", uri, null);
+ return uri;
}
protected void setSenderWithIdLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createSenderWithIdLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "create-with-id", "create-with-id", uri, null);
+ }
+
+ protected String createSenderWithIdLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("create");
String uri = builder.build().toString();
uri += "/{id}";
- serviceManager.getLinkStrategy().setLinkHeader(response, "create-with-id", "create-with-id", uri, null);
+ return uri;
}
protected void setSubscriptionsLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createSubscriptionsLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "pull-subscriptions", "pull-subscriptions", uri, null);
+ }
+
+ protected String createSubscriptionsLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("pull-subscriptions");
String uri = builder.build().toString();
- serviceManager.getLinkStrategy().setLinkHeader(response, "pull-subscriptions", "pull-subscriptions", uri, null);
+ return uri;
}
protected void setPushSubscriptionsLink(Response.ResponseBuilder response, UriInfo info)
{
+ String uri = createPushSubscriptionsLink(info);
+ serviceManager.getLinkStrategy().setLinkHeader(response, "push-subscriptions", "push-subscriptions", uri, null);
+ }
+
+ protected String createPushSubscriptionsLink(UriInfo info)
+ {
UriBuilder builder = info.getRequestUriBuilder();
builder.path("push-subscriptions");
String uri = builder.build().toString();
- serviceManager.getLinkStrategy().setLinkHeader(response, "push-subscriptions", "push-subscriptions", uri, null);
+ return uri;
}
14 years
JBoss hornetq SVN: r10106 - trunk/src/config/jboss-as-6/clustered.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-01-05 11:30:39 -0500 (Wed, 05 Jan 2011)
New Revision: 10106
Modified:
trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
Log:
added system prop for setting the journal data dir
Modified: trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-01-04 00:35:39 UTC (rev 10105)
+++ trunk/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-01-05 16:30:39 UTC (rev 10106)
@@ -24,15 +24,15 @@
<log-delegate-factory-class-name>org.hornetq.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>
- <bindings-directory>${jboss.server.data.dir}/hornetq/bindings</bindings-directory>
+ <bindings-directory>${jboss.server.data.dir}/${hornetq.data.dir:hornetq}/bindings</bindings-directory>
- <journal-directory>${jboss.server.data.dir}/hornetq/journal</journal-directory>
+ <journal-directory>${jboss.server.data.dir}/${hornetq.data.dir:hornetq}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
- <large-messages-directory>${jboss.server.data.dir}/hornetq/largemessages</large-messages-directory>
+ <large-messages-directory>${jboss.server.data.dir}/${hornetq.data.dir:hornetq}/largemessages</large-messages-directory>
- <paging-directory>${jboss.server.data.dir}/hornetq/paging</paging-directory>
+ <paging-directory>${jboss.server.data.dir}/${hornetq.data.dir:hornetq}/paging</paging-directory>
<connectors>
<connector name="netty">
14 years
JBoss hornetq SVN: r10105 - in trunk: hornetq-rest/docbook/reference/en and 3 other directories.
by do-not-reply@jboss.org
Author: bill.burke(a)jboss.com
Date: 2011-01-03 19:35:39 -0500 (Mon, 03 Jan 2011)
New Revision: 10105
Modified:
trunk/hornetq-rest/docbook/reference/en/master.xml
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
trunk/pom.xml
trunk/src/Hornetq.iml
Log:
deadlock bug and expiration/priority support
Modified: trunk/hornetq-rest/docbook/reference/en/master.xml
===================================================================
--- trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/docbook/reference/en/master.xml 2011-01-04 00:35:39 UTC (rev 10105)
@@ -605,7 +605,7 @@
<literal>msg-create</literal> header.</para>
<para><programlisting>POST /queues/jms.queue.bar/create
-Host: example.xom
+Host: example.com
Content-Type: application/xml
<order>
@@ -687,7 +687,7 @@
<literal>msg-create</literal> header.</para>
<para><programlisting>POST /queues/jms.queue.bar/create
-Host: example.xom
+Host: example.com
Content-Type: application/xml
<order>
@@ -799,7 +799,7 @@
that.</para>
<programlisting>POST /queues/jms.queue.bar/create?durable=true
-Host: example.xom
+Host: example.com
Content-Type: application/xml
<order>
@@ -809,6 +809,20 @@
</order>
</programlisting>
</sect1>
+ <sect1>
+ <title>Expiration and Priority</title>
+ <para>You can set he expiration and the priority of the message in the queue or topic by setting an additional query parameter. The <literal>expiration</literal> query parameter is an integer expressing the time in milliseconds until the message should be expired. The <literal>priority</literal> is another query parameter with an integer value between 0 and 9 expressing the priority of the message. i.e.:</para>
+ <programlisting>POST /queues/jms.queue.bar/create?expiration=30000&priority=3
+Host: example.com
+Content-Type: application/xml
+
+<order>
+ <name>Bill</name>
+ <item>iPhone4</item>
+ <cost>$199.99</cost>
+</order>
+</programlisting>
+ </sect1>
</chapter>
<chapter>
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -79,24 +79,18 @@
this.consumerTimeoutSeconds = consumerTimeoutSeconds;
}
- private Object timeoutLock = new Object();
-
- @Override
public void testTimeout(String target)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer = queueConsumers.get(target);
+ if (consumer == null) return;
+ synchronized (consumer)
{
- QueueConsumer consumer = queueConsumers.get(target);
- if (consumer == null) return;
- synchronized (consumer)
+ if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
{
- if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
- {
- log.warn("shutdown REST consumer because of timeout for: " + consumer.getId());
- consumer.shutdown();
- queueConsumers.remove(consumer.getId());
- serviceManager.getTimeoutTask().remove(consumer.getId());
- }
+ log.warn("shutdown REST consumer because of timeout for: " + consumer.getId());
+ consumer.shutdown();
+ queueConsumers.remove(consumer.getId());
+ serviceManager.getTimeoutTask().remove(consumer.getId());
}
}
}
@@ -122,7 +116,7 @@
{
attributes = attributes | SELECTOR_SET;
}
-
+
if (autoAck)
{
consumer = createConsumer(selector);
@@ -141,11 +135,11 @@
if (autoAck)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment + "/" + consumer.getId(), "-1");
}
else
{
- AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
+ AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment + "/" + consumer.getId(), "-1");
}
return builder.build();
@@ -160,16 +154,18 @@
}
}
+ protected void addConsumer(QueueConsumer consumer)
+ {
+ queueConsumers.put(consumer.getId(), consumer);
+ serviceManager.getTimeoutTask().add(this, consumer.getId());
+ }
+
public QueueConsumer createConsumer(String selector)
throws HornetQException
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
- synchronized (timeoutLock)
- {
- queueConsumers.put(genId, consumer);
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
+ addConsumer(consumer);
return consumer;
}
@@ -178,11 +174,7 @@
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
- synchronized (timeoutLock)
- {
- queueConsumers.put(genId, consumer);
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
+ addConsumer(consumer);
return consumer;
}
@@ -206,9 +198,9 @@
// we synchronize just in case a failed request is still processing
synchronized (consumer)
{
- if ( (attributes & ACKNOWLEDGED) > 0)
+ if ((attributes & ACKNOWLEDGED) > 0)
{
- AcknowledgedQueueConsumer ackedConsumer = (AcknowledgedQueueConsumer)consumer;
+ AcknowledgedQueueConsumer ackedConsumer = (AcknowledgedQueueConsumer) consumer;
Acknowledgement ack = ackedConsumer.getAck();
if (ack == null || ack.wasSet())
{
@@ -237,7 +229,7 @@
QueueConsumer consumer = queueConsumers.get(consumerId);
if (consumer == null)
{
- if ( (attributes & SELECTOR_SET) > 0)
+ if ((attributes & SELECTOR_SET) > 0)
{
Response.ResponseBuilder builder = Response.status(Response.Status.GONE)
@@ -247,40 +239,37 @@
uriBuilder.path(uriInfo.getMatchedURIs().get(1));
serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-consumers", "pull-consumers", uriBuilder.build().toString(), null);
throw new WebApplicationException(builder.build());
-
+
}
- if ( (attributes & ACKNOWLEDGED) > 0)
+ if ((attributes & ACKNOWLEDGED) > 0)
{
QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
- consumer = addConsumerToMap(consumerId, tmp);
+ consumer = addReconnectedConsumerToMap(consumerId, tmp);
}
else
{
QueueConsumer tmp = new QueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
- consumer = addConsumerToMap(consumerId, tmp);
+ consumer = addReconnectedConsumerToMap(consumerId, tmp);
}
}
return consumer;
}
- private QueueConsumer addConsumerToMap(String consumerId, QueueConsumer tmp)
+ private QueueConsumer addReconnectedConsumerToMap(String consumerId, QueueConsumer tmp)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer;
+ consumer = queueConsumers.putIfAbsent(consumerId, tmp);
+ if (consumer != null)
{
- QueueConsumer consumer;
- consumer = queueConsumers.putIfAbsent(consumerId, tmp);
- if (consumer != null)
- {
- tmp.shutdown();
- }
- else
- {
- consumer = tmp;
- serviceManager.getTimeoutTask().add(this, consumer.getId());
- }
- return consumer;
+ tmp.shutdown();
}
+ else
+ {
+ consumer = tmp;
+ serviceManager.getTimeoutTask().add(this, consumer.getId());
+ }
+ return consumer;
}
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -42,13 +42,16 @@
return startupTime + Long.toString(counter.incrementAndGet());
}
- public void publish(HttpHeaders headers, byte[] body, String dup, boolean durable) throws Exception
+ public void publish(HttpHeaders headers, byte[] body, String dup,
+ boolean durable,
+ Long expiration,
+ Integer priority) throws Exception
{
Pooled pooled = getPooled();
try
{
ClientProducer producer = pooled.producer;
- ClientMessage message = createHornetQMessage(headers, body, durable, pooled.session);
+ ClientMessage message = createHornetQMessage(headers, body, durable, expiration, priority, pooled.session);
message.putStringProperty(ClientMessage.HDR_DUPLICATE_DETECTION_ID.toString(), dup);
producer.send(message);
pool.add(pooled);
@@ -69,14 +72,20 @@
@PUT
@Path("{id}")
- public Response putWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
+ public Response putWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
+ @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
{
- return postWithId(dupId, durable, headers, uriInfo, body);
+ return postWithId(dupId, durable, expiration, priority, headers, uriInfo, body);
}
@POST
@Path("{id}")
- public Response postWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable, @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
+ public Response postWithId(@PathParam("id") String dupId, @QueryParam("durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
+ @Context HttpHeaders headers, @Context UriInfo uriInfo, byte[] body)
{
String matched = uriInfo.getMatchedURIs().get(1);
UriBuilder nextBuilder = uriInfo.getBaseUriBuilder();
@@ -91,7 +100,7 @@
}
try
{
- publish(headers, body, dupId, isDurable);
+ publish(headers, body, dupId, isDurable, expiration, priority);
}
catch (Exception e)
{
@@ -217,10 +226,27 @@
}
- protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body, boolean durable, ClientSession session) throws Exception
+ protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body,
+ boolean durable,
+ Long expiration,
+ Integer priority,
+ ClientSession session) throws Exception
{
ClientMessage message = session.createMessage(Message.BYTES_TYPE, durable);
+ if (expiration != null)
+ {
+ message.setExpiration(expiration.longValue());
+ }
+ if (priority != null)
+ {
+ byte p = priority.byteValue();
+ if (p >= 0 && p <=9)
+ {
+ message.setPriority(p);
+ }
+ }
HttpMessageHelper.writeHttpMessage(headers, body, message);
return message;
}
+
}
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessageDupsOk.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -23,13 +23,15 @@
public class PostMessageDupsOk extends PostMessage
{
- public void publish(HttpHeaders headers, byte[] body, boolean durable) throws Exception
+ public void publish(HttpHeaders headers, byte[] body, boolean durable,
+ Long expiration,
+ Integer priority) throws Exception
{
Pooled pooled = getPooled();
try
{
ClientProducer producer = pooled.producer;
- ClientMessage message = createHornetQMessage(headers, body, durable, pooled.session);
+ ClientMessage message = createHornetQMessage(headers, body, durable, expiration, priority, pooled.session);
producer.send(message);
pool.add(pooled);
}
@@ -50,6 +52,8 @@
@POST
public Response create(@Context HttpHeaders headers,
@QueryParam("durable") Boolean durable,
+ @QueryParam("expiration") Long expiration,
+ @QueryParam("priority") Integer priority,
@Context UriInfo uriInfo,
byte[] body)
{
@@ -60,7 +64,7 @@
{
isDurable = durable.booleanValue();
}
- publish(headers, body, isDurable);
+ publish(headers, body, isDurable, expiration, priority);
}
catch (Exception e)
{
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2011-01-04 00:35:39 UTC (rev 10105)
@@ -82,24 +82,19 @@
this.destination = destination;
}
- private Object timeoutLock = new Object();
- @Override
public void testTimeout(String target)
{
- synchronized (timeoutLock)
+ QueueConsumer consumer = queueConsumers.get(target);
+ if (consumer == null) return;
+ synchronized (consumer)
{
- QueueConsumer consumer = queueConsumers.get(target);
- if (consumer == null) return;
- synchronized (consumer)
+ if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
{
- if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
- {
- log.warn("shutdown REST consumer because of session timeout for: " + consumer.getId());
- consumer.shutdown();
- queueConsumers.remove(consumer.getId());
- serviceManager.getTimeoutTask().remove(consumer.getId());
- }
+ log.warn("shutdown REST consumer because of session timeout for: " + consumer.getId());
+ consumer.shutdown();
+ queueConsumers.remove(consumer.getId());
+ serviceManager.getTimeoutTask().remove(consumer.getId());
}
}
}
@@ -372,28 +367,25 @@
QueueConsumer consumer;
if (subscriptionExists(subscriptionId))
{
- synchronized (timeoutLock)
+ QueueConsumer tmp = null;
+ try
{
- QueueConsumer tmp = null;
- try
- {
- tmp = createConsumer(true, autoAck, subscriptionId, null);
- }
- catch (HornetQException e)
- {
- throw new RuntimeException(e);
- }
- consumer = queueConsumers.putIfAbsent(subscriptionId, tmp);
- if (consumer == null)
- {
- consumer = tmp;
- serviceManager.getTimeoutTask().add(this, subscriptionId);
- }
- else
- {
- tmp.shutdown();
- }
+ tmp = createConsumer(true, autoAck, subscriptionId, null);
}
+ catch (HornetQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ consumer = queueConsumers.putIfAbsent(subscriptionId, tmp);
+ if (consumer == null)
+ {
+ consumer = tmp;
+ serviceManager.getTimeoutTask().add(this, subscriptionId);
+ }
+ else
+ {
+ tmp.shutdown();
+ }
}
else
{
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/pom.xml 2011-01-04 00:35:39 UTC (rev 10105)
@@ -22,7 +22,7 @@
<version>2.2.0.CR1</version>
<properties>
- <resteasy.version>2.0.1.GA</resteasy.version>
+ <resteasy.version>2.1.0.GA</resteasy.version>
</properties>
<name>HornetQ</name>
Modified: trunk/src/Hornetq.iml
===================================================================
--- trunk/src/Hornetq.iml 2011-01-02 05:04:27 UTC (rev 10104)
+++ trunk/src/Hornetq.iml 2011-01-04 00:35:39 UTC (rev 10105)
@@ -7,28 +7,31 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="library" name="jars" level="project" />
+ <orderEntry type="library" name="lib" level="project" />
<orderEntry type="library" name="lib10" level="project" />
- <orderEntry type="library" name="lib13" level="project" />
+ <orderEntry type="library" name="lib3" level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
- <root url="jar://$MODULE_DIR$/../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/" />
+ <root url="jar://$MODULE_DIR$/../thirdparty/org/twitter4j/lib/twitter4j-core.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
+ <orderEntry type="library" name="lib4" level="project" />
+ <orderEntry type="library" name="lib7" level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
- <root url="jar://$MODULE_DIR$/../thirdparty/org/twitter4j/lib/twitter4j-core.jar!/" />
+ <root url="jar://$MODULE_DIR$/../thirdparty/org/jboss/metadata/lib/jboss-metadata.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
- <orderEntry type="library" name="lib3" level="project" />
- <orderEntry type="library" name="lib5" level="project" />
+ <orderEntry type="library" name="lib2" level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
@@ -38,6 +41,8 @@
<SOURCES />
</library>
</orderEntry>
+ <orderEntry type="library" name="lib5" level="project" />
+ <orderEntry type="library" name="lib13" level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
@@ -47,8 +52,6 @@
<SOURCES />
</library>
</orderEntry>
- <orderEntry type="library" name="jars" level="project" />
- <orderEntry type="library" name="lib6" level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
@@ -58,9 +61,6 @@
<SOURCES />
</library>
</orderEntry>
- <orderEntry type="library" name="lib9" level="project" />
- <orderEntry type="library" name="lib4" level="project" />
- <orderEntry type="library" name="lib" level="project" />
</component>
</module>
14 years