JBoss hornetq SVN: r8155 - in trunk/docs/user-manual/en: diagrams and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-28 09:25:50 -0400 (Wed, 28 Oct 2009)
New Revision: 8155
Added:
trunk/docs/user-manual/en/diagrams/ha-replicated-store.odg
trunk/docs/user-manual/en/diagrams/ha-shared-store.odg
trunk/docs/user-manual/en/images/ha-replicated-store.png
trunk/docs/user-manual/en/images/ha-shared-store.png
Modified:
trunk/docs/user-manual/en/client-reconnection.xml
trunk/docs/user-manual/en/examples.xml
trunk/docs/user-manual/en/ha.xml
Log:
HA documentation (WIP)
Modified: trunk/docs/user-manual/en/client-reconnection.xml
===================================================================
--- trunk/docs/user-manual/en/client-reconnection.xml 2009-10-28 01:01:50 UTC (rev 8154)
+++ trunk/docs/user-manual/en/client-reconnection.xml 2009-10-28 13:25:50 UTC (rev 8155)
@@ -78,7 +78,7 @@
<para><literal>reconnect-attempts</literal>. This optional parameter determines the
total number of reconnect attempts the bridge will make before giving up and
shutting down. A value of <literal>-1</literal> signifies an unlimited number of
- attempts. The default value is <literal>-1</literal>.</para>
+ attempts. The default value is <literal>0</literal>.</para>
</listitem>
</itemizedlist>
<para>If you're using JMS, and you're using the JMS Service on the server to load your JMS
Added: trunk/docs/user-manual/en/diagrams/ha-replicated-store.odg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/user-manual/en/diagrams/ha-replicated-store.odg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/user-manual/en/diagrams/ha-shared-store.odg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/user-manual/en/diagrams/ha-shared-store.odg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Modified: trunk/docs/user-manual/en/examples.xml
===================================================================
--- trunk/docs/user-manual/en/examples.xml 2009-10-28 01:01:50 UTC (rev 8154)
+++ trunk/docs/user-manual/en/examples.xml 2009-10-28 13:25:50 UTC (rev 8155)
@@ -33,10 +33,7 @@
<para>Here's a listing of the examples with a brief description.</para>
<section id="application-level-failover">
<title>Application-Layer Failover</title>
- <para>HornetQ implements fully transparent automatic failover of connections from a live
- to backup node, this requires no special coding for failover, and is described in a
- different example. Automatic failover requires server replication.</para>
- <para>However, HornetQ also supports Application-Layer failover, useful in the case that
+ <para>HornetQ also supports Application-Layer failover, useful in the case that
replication is not enabled on the server side.</para>
<para>With Application-Layer failover, it's up to the application to register a JMS
<literal>ExceptionListener</literal> with HornetQ which will be called by
@@ -50,18 +47,6 @@
uncommitted work in the old session will be lost, and any unacknowledged messages
might be redelivered.</para>
</section>
- <section id="automatic-failover">
- <title>Automatic (Transparent) Failover</title>
- <para>The <literal>automatic-failover</literal> example demonstrates two servers coupled
- as a live-backup pair for high availability (HA), and a client connection
- transparently failing over from live to backup when the live server is
- crashed.</para>
- <para>HornetQ implements seamless, transparent failover of client connections between
- live and backup servers. This is implemented by the replication of state between
- live and backup nodes. When replication is configured and a live node crashes, the
- client connections can carry on as if nothing happened and carry on sending and
- consuming messages.</para>
- </section>
<section>
<title>Automatic Reconnect</title>
<para>The <literal>reconnect-same-node</literal> example demonstrates how HornetQ
@@ -334,6 +319,19 @@
<para>In some case buffering is not desirable, and HornetQ allows it to be switched off.
This example demonstrates that.</para>
</section>
+ <section id="examples.non-transaction-failover">
+ <title>Non-Transaction Failover With Server Data Replication</title>
+ <para>The <literal>non-transaction-failover</literal> example demonstrates two servers coupled
+ as a live-backup pair for high availability (HA), and a client using a <emphasis>non-transacted
+ </emphasis> JMS session failing over from live to backup when the live server is
+ crashed.</para>
+ <para>HornetQ implements failover of client connections between
+ live and backup servers. This is implemented by the replication of state between
+ live and backup nodes. When replication is configured and a live node crashes, the
+ client connections can carry and continue to send and consume messages. When non-transacted
+ sessions are used, once and only once message delivery is not guaranteed and it is possible
+ that some messages will be lost or delivered twice.</para>
+ </section>
<section id="examples.paging">
<title>Paging</title>
<para>The <literal>paging</literal> example shows how HornetQ can support huge queues
@@ -438,6 +436,18 @@
<para>The <literal>topic-selector-example1</literal> example shows you how to
selectively consume messages using message selectors with topic consumers.</para>
</section>
+ <section id="examples.transaction-failover">
+ <title>Transaction Failover With Data Replication</title>
+ <para>The <literal>transaction-failover</literal> example demonstrates two servers coupled
+ as a live-backup pair for high availability (HA), and a client using a transacted JMS
+ session failing over from live to backup when the live server is
+ crashed.</para>
+ <para>HornetQ implements failover of client connections between
+ live and backup servers. This is implemented by the replication of data between
+ live and backup nodes. When replication is configured and a live node crashes, the
+ client connections can carry and continue to send and consume messages. When transacted
+ sessions are used, once and only once message delivery is guaranteed.</para>
+ </section>
<section>
<title>Transactional Session</title>
<para>The <literal>transactional</literal> example shows you how to use a transactional
Modified: trunk/docs/user-manual/en/ha.xml
===================================================================
--- trunk/docs/user-manual/en/ha.xml 2009-10-28 01:01:50 UTC (rev 8154)
+++ trunk/docs/user-manual/en/ha.xml 2009-10-28 13:25:50 UTC (rev 8155)
@@ -24,110 +24,127 @@
client connections to migrate from one server to another in event of server failure so
client applications can continue to operate</emphasis>.</para>
<para>HornetQ provides high availability by replicating servers in pairs. It also provides both
- automatic client failover and application-level client failover.</para>
+ client failover and application-level client failover.</para>
<section>
- <title>Server replication</title>
+ <title>Live - Backup Pairs</title>
<para>HornetQ allows pairs of servers to be linked together as <emphasis>live -
backup</emphasis> pairs. In this release there is a single backup server for each
live server. Backup servers are not operational until failover occurs. In later releases
we will most likely support replication onto multiple backup servers.</para>
- <para>When a <emphasis>live - backup</emphasis> pair is configured, HornetQ ensures that the
- live server state is replicated to the backup server. Replicated state includes session
- state, and also global state such as the set of queues and addresses on the server. </para>
- <para>When a client fails over from live to backup server, the backup server will already
- have the correct global and session state, so the client will be able to resume its
- session(s) on the backup server as if nothing happened.</para>
- <para>Replication is performed in an asynchronous fashion between live and backup server.
- Data is replicated one way in a stream, and responses that the data has reached the
- backup is returned in another stream. By pipelining replications and responses to
- replications in separate streams allows replication throughput to be much higher than if
- we synchronously replicated data and waited for a response serially in an RPC manner
- before replicating the next piece of data.</para>
- <section id="configuring.live.backup">
- <title>Configuring live-backup pairs</title>
- <para>First, on the live server, in <literal>hornetq-configuration.xml</literal>
- configure the live server with knowledge of its backup server. This is done by
- specifying a <literal>backup-connector-ref</literal> element. This element
- references a connector, also specified on the live server which contains knowledge
- of how to connect to the backup server. Here's a snippet from <literal
- >hornetq-configuration.xml</literal> showing a live server configured with a
- backup server:</para>
- <programlisting><backup-connector-ref connector-name="backup-connector"/>
-
-<!-- Connectors -->
+ <para>Before failover, only the live server is serving the HornetQ clients while the backup server remains passive.
+ When clients fail over to the backup server, the backup server becomes active and start to service the HornetQ clients.</para>
+
+ <section id="ha.mode">
+ <title>HA modes</title>
+ <para>HornetQ provides two different modes for High Availability, either by <emphasis>replicating data</emphasis> from the live server journal
+ to the backup server or using a <emphasis>shared state</emphasis> for both servers.</para>
+ <section id="ha.mode.replicated">
+ <title>Data Replication</title>
+ <para>In this mode, data stored in HornetQ journal are replicated from the live servers's journal to the
+ backuper server's journal.</para>
+ <para>Replication is performed in an asynchronous fashion between live and backup server.
+ Data is replicated one way in a stream, and responses that the data has reached the
+ backup is returned in another stream. Pipelining replications and responses to
+ replications in separate streams allows replication throughput to be much higher than if
+ we synchronously replicated data and waited for a response serially in an RPC manner
+ before replicating the next piece of data.</para>
+ <graphic fileref="images/ha-replicated-store.png" align="center"/>
+ <section id="configuring.live.backup">
+ <title>Configuration</title>
+ <para>First, on the live server, in <literal>hornetq-configuration.xml</literal>,
+ configure the live server with knowledge of its backup server. This is done by
+ specifying a <literal>backup-connector-ref</literal> element. This element
+ references a connector, also specified on the live server which contains knowledge
+ of how to connect to the backup server.</para>
+ <para>Here's a snippet from live server's <literal
+ >hornetq-configuration.xml</literal> configured to connect to its backup server:</para>
+ <programlisting>
+ <backup-connector-ref connector-name="backup-connector"/>
-<connectors>
+ <connectors>
+ <!-- This connector specifies how to connect to the backup server -->
+ <!-- backup server is located on host "192.168.0.11" and port "5445" -->
+ <connector name="backup-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.host" value="192.168.0.11" type="String"/>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+ </connectors></programlisting>
+ <para>Secondly, on the backup server, we flag the server as a backup and make sure it has an acceptor that the live server can connect to:</para>
+ <programlisting>
+ <backup>true</backup>
- ...
-
- <!-- This connector specifies how to connect to the backup server -->
- <connector name="backup-connector">
- <factory-class>
- org.hornetq.integration.transports.netty.NettyConnectorFactory
- </factory-class>
- <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
- </connector>
-
-</connectors></programlisting>
- <para>Secondly, on the backup server, also in <literal
- >hornetq-configuration.xml</literal> , the element <literal>backup</literal>
- must be set to true. I.e. :</para>
- <programlisting><backup>true</backup>
-</programlisting>
+ <acceptors>
+ <acceptor name="acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.host" value="192.168.0.11" type="String"/>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </acceptor>
+ </acceptors>
+ </programlisting>
+ <para>For a backup server to function correctly it's also important that it has the same
+ set of bridges, predefined queues, cluster connections, broadcast groups and
+ discovery groups as defined on the live node. The easiest way to ensure this is just
+ to copy the entire server side configuration from live to backup and just make the
+ changes as specified above. </para>
+ </section>
+ <section>
+ <title>Synchronization of live-backup pairs</title>
+ <para>In order for live - backup pairs to operate properly, they must be identical
+ replicas. This means you cannot just use any backup server that's previously been
+ used for other purposes as a backup server, since it will have different data in its
+ persistent storage. If you try to do so you will receive an exception in the logs
+ and the server will fail to start.</para>
+ <para>To create a backup server for a live server that's already been used for other
+ purposes, it's necessary to copy the <literal>data</literal> directory from the live
+ server to the backup server. This means the backup server will have an identical
+ persistent store to the backup server.</para>
+ <para>After failover, when the live server is restarted, the backup server will copy its
+ journal back to the live server. When the live server has the updated journal, it will
+ become active again and the backup server will become passive.</para>
+ </section>
+ </section>
+ <section id="ha.mode.shared">
+ <title>Shared Store</title>
+ <para>When using a shared store, both live and backup servers share the <emphasis>same</emphasis> journal
+ using a shared file system. When failover occurs and backup server takes over, it will load the journal and
+ clients can connect to it.</para>
+ <graphic fileref="images/ha-shared-store.png" align="center"/>
+ <section id="ha/mode.shared.configuration">
+ <title>Configuration</title>
+ <para>To configure the live and backup server to share their store, configure both <literal>hornetq-configuration.xml</literal>:</para>
+ <programlisting>
+ <shared-store>true<shared-store>
+ </programlisting>
+ <para>In order for live - backup pairs to operate properly with a shared store, both servers
+ must have configured the location of journal directory to point
+ to the <emphasis>same shared location</emphasis> (as explained in <xref linkend="configuring.message.journal" />)</para>
+ </section>
+ <section>
+ <title>Synchronization of live-backup pairs</title>
+ <para>As both live and backup servers share the same journal, they do not need to be synchronized.
+ However until, both live and backup servers are up and running, high-availability can not be provided with a single server.
+ After failover, at first opportunity, stop the backup server (which is active) and restart the live and backup servers.</para>
+ </section>
+ </section>
</section>
- <section>
- <title>Synchronization of live-backup pairs</title>
- <para>In order for live - backup pairs to operate properly, they must be identical
- replicas. This means you cannot just use any backup server that's previously been
- used for other purposes as a backup server, since it will have different data in its
- persistent storage. If you try to do so you will receive an exception in the logs
- and the server will fail to start.</para>
- <para>To create a backup server for a live server that's already been used for other
- purposes, it's necessary to copy the <literal>data</literal> directory from the live
- server to the backup server. This means the backup server will have an identical
- persistent store to the backup server.</para>
- <para>Similarly when a client fails over from a live server <literal>L</literal> to a
- backup server <literal>B</literal>, the server <literal>L</literal> becomes invalid
- since, from that point on, the data on <literal>L</literal> and <literal>B</literal>
- may diverge. After such a failure, at the next available opportunity the <literal
- >B</literal> server should be taken down, and its <literal>data</literal>
- directory copied back to the <literal>L</literal> server. Live and backup servers
- can then be restarted. In this release of HornetQ we do not provide any automatic
- facility for re-assigning a backup node with a live node while it is running.</para>
- <para>For a backup server to function correctly it's also important that it has the same
- set of bridges, predefined queues, cluster connections, broadcast groups and
- discovery groups as defined on the live node. The easiest way to ensure this is just
- to copy the entire server side configuration from live to backup and just make the
- changes as specified in the previous section. </para>
- </section>
- <section id="queue.activation.timeout">
- <title>Queue activation timeout</title>
- <para>If a live server fails, as client connections failover from the live node to the
- backup, they do so at a rate determined by the client, and it might be the case that
- some client connections never fail over.</para>
- <para>Different client connections may have different consumers on the same queue(s).
- The queue on the backup will wait for all its consumers to reattach before
- activating delivery on itself. If all connections have not reattached with this
- timeout then the queue will activate regardless.</para>
- <para>This param is defined in <literal>hornetq-configuration.xml</literal> using the
- setting <literal>queue-activation-timeout</literal>. Its default value is <literal
- >30000</literal> milliseconds.</para>
- </section>
</section>
+
+ <section id="failover">
+ <title>Failover Modes</title>
+ <para>HornetQ defines 3 types of failover:</para>
+ <itemizedlist>
+ <listitem><para>100% transparent re-attach to a single server as explained in <xref linkend="client-reconnection" /></para></listitem>
+ <listitem><para>automatic failover</para></listitem>
+ <listitem><para>application-level failover</para></listitem>
+ </itemizedlist>
+
<section id="ha.client.automatic">
- <title>Automatic client failover</title>
+ <title>Automatic Client Failover</title>
<para>HornetQ clients can be configured with knowledge of live and backup servers, so that
in event of connection failure of the client - live server connection, the client will
- detect this and reconnect its sessions to the backup server. Because of server
- replication, then backup server will already have those sessions in the same state they
- were left on the live server and the client will be able to reconnect them and resume
- them 100% transparently as if nothing happened.</para>
- <para>For automatic failover HornetQ requires <emphasis>zero</emphasis> coding of special
- failover code on the client or server. This differs from other messaging systems which
- intrusively require you to code special failover handling code. HornetQ automatic
- failover preserves all your normal JMS or core API semantics and allows your client code
- to continue 100% uninterrupted on event of connection failure and failover from a live
- to a backup server.</para>
+ detect this and reconnect to the backup server. The backup server will have recreated the sessions
+ and consumers but it will not preserve the session state from the live server.</para>
<para>HornetQ clients detect connection failure when it has not received packets from the
server within the time given by <literal>client-failure-check-period</literal> as
explained in section <xref linkend="connection-ttl"/>. If the client does not receive
@@ -137,6 +154,8 @@
way of doing this is to use <emphasis>server discovery</emphasis> for the client to
automatically discover the list. For full details on how to configure clients please see
<xref linkend="clusters.server-discovery"/>.</para>
+ <para>To enable automatic client failover, the client must be configured to allow non-zero reconnection attempts
+ (as explained in <xref linkend="client-reconnection" />).</para>
<para>Sometimes you want a client to failover onto a backup server even if the live server
is just cleanly shutdown rather than having crashed or the connection failed. To
configure this you can set the property <literal>FailoverOnServerShutdown</literal> to
@@ -147,19 +166,18 @@
this property is <literal>false</literal>, this means that by default <emphasis>HornetQ
clients will not failover to a backup server if the live server is simply shutdown
cleanly.</emphasis></para>
- <para>For a fully functional example of automatic failover, please see <xref
- linkend="automatic-failover"/>.</para>
- </section>
+ <para>For examples of automatic failover with transacted and non-transacted JMS sessions, please see <xref
+ linkend="examples.transaction-failover"/> and <xref linkend="examples.non-transaction-failover" />.</para> </section>
<section>
- <title>Application-level client failover</title>
+ <title>Application-Level Failover</title>
<para>In some cases you may not want automatic client failover, and prefer to handle any
connection failure yourself, and code your own manually reconnection logic in your own
failure handler. We define this as <emphasis>application-level</emphasis> failover,
since the failover is handled at the user application level.</para>
- <para>If all your clients use application-level failover then you do not need server
+ <para>If all your clients use application-level failover then you do not need data
replication on the server side, and should disabled this. Server replication has some
performance overhead and should be disabled if it is not required. To disable server
- replication simply do not specify a <literal>backup-connector</literal> element for each
+ replication simply do not specify a <literal>backup-connector</literal> element on each
live server.</para>
<para>To implement application-level failover, if you're using JMS then you need to code an
<literal>ExceptionListener</literal> class on the JMS connection. The <literal
@@ -175,4 +193,5 @@
<literal>FailureListener</literal> on your core <literal>ClientSession</literal>
instances.</para>
</section>
+ </section>
</chapter>
Added: trunk/docs/user-manual/en/images/ha-replicated-store.png
===================================================================
(Binary files differ)
Property changes on: trunk/docs/user-manual/en/images/ha-replicated-store.png
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: trunk/docs/user-manual/en/images/ha-shared-store.png
===================================================================
(Binary files differ)
Property changes on: trunk/docs/user-manual/en/images/ha-shared-store.png
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
14 years, 6 months
JBoss hornetq SVN: r8154 - in branches/Clebert_Sync: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-27 21:01:50 -0400 (Tue, 27 Oct 2009)
New Revision: 8154
Added:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Backup changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-28 01:01:50 UTC (rev 8154)
@@ -69,7 +69,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
-public class JournalImpl implements TestableJournal
+public class JournalImpl implements TestableJournal, JournalLock
{
// Constants -----------------------------------------------------
@@ -2736,7 +2736,44 @@
lockAppend.unlock();
}
}
+
+ // JournalLock Interface ------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readLock()
+ */
+ public void readLock()
+ {
+ globalLock.readLock().lock();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readUnlock()
+ */
+ public void readUnlock()
+ {
+ globalLock.readLock().unlock();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeLock()
+ */
+ public void writeLock()
+ {
+ globalLock.writeLock().lock();
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeUnLock()
+ */
+ public void writeUnLock()
+ {
+ globalLock.writeLock().unlock();
+ }
+
+
+
// Public
// -----------------------------------------------------------------------------
Added: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java (rev 0)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java 2009-10-28 01:01:50 UTC (rev 8154)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+/**
+ * This interface is used to share the lock operations done at the Journal level.
+ *
+ * Some Journal delegates may need to make sure the global locks are shared with the caller.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalLock
+{
+ void readLock();
+
+ void readUnlock();
+
+ void writeLock();
+
+ void writeUnLock();
+}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28 01:01:50 UTC (rev 8154)
@@ -195,7 +195,7 @@
if (replicator != null)
{
- this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
+ this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, localBindings, replicator);
}
else
{
@@ -263,7 +263,7 @@
if (replicator != null)
{
- this.messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+ this.messageJournal = new ReplicatedJournal((byte)1, localMessage, localMessage, replicator);
}
else
{
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-28 01:01:50 UTC (rev 8154)
@@ -21,6 +21,7 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalLock;
import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
@@ -49,14 +50,20 @@
private final Journal localJournal;
+ private final JournalLock journalLock;
+
private final byte journalID;
- public ReplicatedJournal(final byte journaID, final Journal localJournal, final ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journaID,
+ final JournalLock journalLock,
+ final Journal localJournal,
+ final ReplicationManager replicationManager)
{
super();
journalID = journaID;
this.localJournal = localJournal;
this.replicationManager = replicationManager;
+ this.journalLock = journalLock;
}
public ReplicatedJournal(final byte journaID, final ReplicationManager replicationManager)
@@ -64,6 +71,7 @@
super();
journalID = journaID;
localJournal = null;
+ journalLock = null;
this.replicationManager = replicationManager;
}
@@ -104,11 +112,19 @@
{
trace("Append record id = " + id + " recordType = " + recordType);
}
- replicationManager.appendAddRecord(journalID, id, recordType, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendAddRecord(id, recordType, record, sync);
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecord(id, recordType, record, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
}
/**
@@ -141,11 +157,20 @@
{
trace("Append record TXid = " + id + " recordType = " + recordType);
}
- replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -160,11 +185,20 @@
{
trace("AppendCommit " + txID);
}
- replicationManager.appendCommitRecord(journalID, txID);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendCommitRecord(txID, sync);
+ replicationManager.appendCommitRecord(journalID, txID);
+ if (localJournal != null)
+ {
+ localJournal.appendCommitRecord(txID, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -179,11 +213,20 @@
{
trace("AppendDelete " + id);
}
- replicationManager.appendDeleteRecord(journalID, id);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendDeleteRecord(id, sync);
+ replicationManager.appendDeleteRecord(journalID, id);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecord(id, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -211,11 +254,20 @@
{
trace("AppendDelete txID=" + txID + " id=" + id);
}
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendDeleteRecordTransactional(txID, id, record);
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id, record);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -230,11 +282,20 @@
{
trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
}
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendDeleteRecordTransactional(txID, id);
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -262,11 +323,20 @@
{
trace("AppendPrepare txID=" + txID);
}
- replicationManager.appendPrepareRecord(journalID, txID, transactionData);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendPrepareRecord(txID, transactionData, sync);
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ if (localJournal != null)
+ {
+ localJournal.appendPrepareRecord(txID, transactionData, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -281,11 +351,20 @@
{
trace("AppendRollback " + txID);
}
- replicationManager.appendRollbackRecord(journalID, txID);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendRollbackRecord(txID, sync);
+ replicationManager.appendRollbackRecord(journalID, txID);
+ if (localJournal != null)
+ {
+ localJournal.appendRollbackRecord(txID, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -315,11 +394,20 @@
{
trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
}
- replicationManager.appendUpdateRecord(journalID, id, recordType, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendUpdateRecord(id, recordType, record, sync);
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecord(id, recordType, record, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -355,11 +443,20 @@
{
trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
}
- replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -502,6 +599,22 @@
// Private -------------------------------------------------------
+ private void preAppend()
+ {
+ if (journalLock != null)
+ {
+ journalLock.readLock();
+ }
+ }
+
+ private void afterAppend()
+ {
+ if (journalLock != null)
+ {
+ journalLock.readUnlock();
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-28 01:01:50 UTC (rev 8154)
@@ -41,6 +41,7 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalLock;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
@@ -169,7 +170,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), new FakeJournal(), manager);
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
@@ -272,11 +273,11 @@
config.setBackup(true);
ArrayList<String> intercepts = new ArrayList<String>();
-
+
intercepts.add(TestInterceptor.class.getName());
-
+
config.setInterceptorClassNames(intercepts);
-
+
HornetQServer server = new HornetQServerImpl(config);
server.start();
@@ -288,7 +289,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), new FakeJournal(), manager);
Thread.sleep(100);
TestInterceptor.value.set(false);
@@ -308,7 +309,7 @@
});
manager.closeContext();
-
+
server.stop();
assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -357,7 +358,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), new FakeJournal(), manager);
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
@@ -505,8 +506,7 @@
};
-
- static class FakeJournal implements Journal
+ static class FakeJournal implements Journal, JournalLock
{
/* (non-Javadoc)
@@ -720,5 +720,33 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readLock()
+ */
+ public void readLock()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readUnlock()
+ */
+ public void readUnlock()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeLock()
+ */
+ public void writeLock()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeUnLock()
+ */
+ public void writeUnLock()
+ {
+ }
+
}
}
14 years, 6 months
JBoss hornetq SVN: r8153 - in branches/Clebert_Sync: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-27 19:48:09 -0400 (Tue, 27 Oct 2009)
New Revision: 8153
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Log:
Backup changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -36,7 +36,7 @@
/** enable some trace at development. */
private static final boolean DEV_TRACE = true;
- private static final boolean isTraceEnabled = log.isTraceEnabled();
+ private static final boolean isTrace = log.isTraceEnabled();
private static void trace(final String msg)
{
@@ -49,7 +49,8 @@
private final Journal journalTo;
- /** Proxy mode means, everything will be copied over without any evaluations such as */
+ /** Proxy mode means, everything will be copied over without any evaluations.
+ * This is useful at the end of copying when everything needs to be copied. */
private boolean proxyMode = false;
// Static --------------------------------------------------------
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -1553,6 +1553,8 @@
// Need to make sure everything is out of executors and on the disk before backing it up
flush();
+ copier.setProxyMode(true);
+
List<JournalFile> newDataFilesToProcess = getSnapshotFilesToProcess();
Collections.sort(newDataFilesToProcess, new JournalFileComparator());
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -184,7 +184,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- Journal localBindings = new JournalImpl(1024 * 1024,
+ JournalImpl localBindings = new JournalImpl(1024 * 1024,
2,
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
@@ -252,7 +252,7 @@
this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
}
- Journal localMessage = new JournalImpl(config.getJournalFileSize(),
+ JournalImpl localMessage = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -17,6 +17,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -44,50 +45,67 @@
// Attributes ----------------------------------------------------
final AtomicInteger sequence = new AtomicInteger(0);
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
public void testSimpleCopy() throws Exception
{
setup(2, 100 * 1024, false);
createJournal();
startJournal();
load();
-
-
- for (int i = 0 ; i < 10; i++)
+ ArrayList<Long> transactions = new ArrayList<Long>();
+
+ ArrayList<Long> toDelete = new ArrayList<Long>();
+
+ for (int i = 0; i < 40; i++)
{
- addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ long iDelete = sequence.incrementAndGet();
+
+ toDelete.add(iDelete);
+
+ addWithSize(1024, iDelete);
}
- addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
- for (int i = 0 ; i < 10; i++)
+
+ long tx = sequence.incrementAndGet();
+ transactions.add(tx);
+ addTx(tx, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
+ for (int i = 0; i < 10; i++)
{
- addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ addWithSize(1024,
+ sequence.incrementAndGet(),
+ sequence.incrementAndGet(),
+ sequence.incrementAndGet(),
+ sequence.incrementAndGet());
}
-
- File destDir = new File(getTestDir()+"/dest");
-
+
+ File destDir = new File(getTestDir() + "/dest");
+
destDir.mkdirs();
-
+
SequentialFileFactory newFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
-
+
Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, newFactory, filePrefix, fileExtension, 1);
destJournal.start();
destJournal.loadInternalOnly();
-
+
CountDownLatch locked = new CountDownLatch(1);
-
+
JournalHandler handler = new JournalHandler(destJournal, locked, 5);
-
- final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{Journal.class}, handler);
-
+
+ final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),
+ new Class[] { Journal.class },
+ handler);
+
Thread copier = new Thread()
{
+ @Override
public void run()
{
try
@@ -98,36 +116,50 @@
{
e.printStackTrace();
}
-
+
}
};
-
+
copier.start();
-
+
assertTrue(locked.await(10, TimeUnit.SECONDS));
-
+
sequence.set(5000);
-
- for (int i = 0 ; i < 10 ; i++)
+
+ for (int i = 0; i < 10; i++)
{
- addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
+ tx = sequence.incrementAndGet();
+ transactions.add(tx);
+ addTx(tx, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
journal.forceMoveNextFile();
}
-
-
-
+
+ for (Long txToCommit : transactions)
+ {
+ commit(txToCommit);
+ }
+
+ for (Long iDelete : toDelete)
+ {
+ delete(iDelete);
+ }
+
handler.unlock();
-
+
copier.join();
-
+
stopJournal();
-
+
destJournal.stop();
-
- this.fileFactory = newFactory;
-
+
+ fileFactory = newFactory;
+
+ createJournal();
+
startJournal();
-
+
loadAndCheck(true);
}
@@ -144,53 +176,47 @@
ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
1000000,
false,
- false
- );
+ false);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
-
+
/** This handler will lock after N calls, until the Handler is opened again */
protected class JournalHandler implements InvocationHandler
{
-
-
+
final VariableLatch valve = new VariableLatch();
-
+
final CountDownLatch locked;
-
+
final int executionsBeforeLock;
-
+
int executions = 0;
-
-
+
private final Journal target;
-
- public JournalHandler(Journal journal, CountDownLatch locked, int executionsBeforeLock)
+
+ public JournalHandler(final Journal journal, final CountDownLatch locked, final int executionsBeforeLock)
{
- this.target = journal;
+ target = journal;
this.locked = locked;
this.executionsBeforeLock = executionsBeforeLock;
}
-
+
private void lock()
{
valve.up();
}
-
+
public void unlock()
{
valve.down();
}
-
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable
{
- System.out.println("Invoked " + method.getName());
- if (executions ++ == executionsBeforeLock)
+ if (executions++ == executionsBeforeLock)
{
lock();
locked.countDown();
@@ -201,7 +227,7 @@
}
return method.invoke(target, args);
}
-
+
}
// Private -------------------------------------------------------
14 years, 6 months
JBoss hornetq SVN: r8152 - in branches/Clebert_Sync: tests/src/org/hornetq/tests/unit/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-27 17:13:03 -0400 (Tue, 27 Oct 2009)
New Revision: 8152
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Log:
Backup again...
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 21:13:03 UTC (rev 8152)
@@ -18,9 +18,7 @@
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.DataConstants;
/**
* This will read records
@@ -35,13 +33,12 @@
private static final Logger log = Logger.getLogger(JournalCopier.class);
-
/** enable some trace at development. */
private static final boolean DEV_TRACE = true;
-
+
private static final boolean isTraceEnabled = log.isTraceEnabled();
-
- private static void trace(String msg)
+
+ private static void trace(final String msg)
{
System.out.println("JournalCopier::" + msg);
}
@@ -52,6 +49,9 @@
private final Journal journalTo;
+ /** Proxy mode means, everything will be copied over without any evaluations such as */
+ private boolean proxyMode = false;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -62,14 +62,14 @@
* @param recordsSnapshot
* @param nextOrderingID
*/
- public JournalCopier(SequentialFileFactory fileFactory,
- JournalImpl journalFrom,
- Journal journalTo,
- Set<Long> recordsSnapshot,
- Set<Long> pendingTransactionsSnapshot)
+ public JournalCopier(final SequentialFileFactory fileFactory,
+ final JournalImpl journalFrom,
+ final Journal journalTo,
+ final Set<Long> recordsSnapshot,
+ final Set<Long> pendingTransactionsSnapshot)
{
super(fileFactory, journalFrom, recordsSnapshot, -1);
- this.pendingTransactions = pendingTransactionsSnapshot;
+ pendingTransactions = pendingTransactionsSnapshot;
this.journalTo = journalTo;
}
@@ -81,7 +81,7 @@
public void onReadAddRecord(final RecordInfo info) throws Exception
{
- if (lookupRecord(info.id))
+ if (proxyMode || lookupRecord(info.id))
{
if (DEV_TRACE)
{
@@ -100,7 +100,7 @@
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
if (DEV_TRACE)
{
@@ -118,8 +118,12 @@
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode)
{
+ journalTo.appendCommitRecord(transactionID, false);
+ }
+ else if (pendingTransactions.contains(transactionID))
+ {
// Sanity check, this should never happen
log.warn("Inconsistency during compacting: CommitRecord ID = " + transactionID +
" for an already committed transaction during compacting");
@@ -128,12 +132,17 @@
public void onReadDeleteRecord(final long recordID) throws Exception
{
+ if (proxyMode)
+ {
+ journalTo.appendDeleteRecord(recordID, false);
+ }
+ // else....
// Nothing to be done here, we don't copy deleted records
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
journalTo.appendDeleteRecordTransactional(transactionID, info.id, info.data);
}
@@ -147,7 +156,7 @@
public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
journalTo.appendPrepareRecord(transactionID, extraData, false);
}
@@ -155,6 +164,10 @@
public void onReadRollbackRecord(final long transactionID) throws Exception
{
+ if (proxyMode)
+ {
+ journalTo.appendRollbackRecord(transactionID, false);
+ }
if (pendingTransactions.contains(transactionID))
{
// Sanity check, this should never happen
@@ -165,7 +178,7 @@
public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
- if (lookupRecord(info.id))
+ if (proxyMode || lookupRecord(info.id))
{
journalTo.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
}
@@ -173,7 +186,7 @@
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
journalTo.appendUpdateRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
}
@@ -183,6 +196,11 @@
}
}
+ public void setProxyMode(final boolean proxyMode)
+ {
+ this.proxyMode = proxyMode;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 21:13:03 UTC (rev 8152)
@@ -1476,8 +1476,6 @@
compactMinFiles = 0;
autoReclaim = false;
- flushExecutor();
-
// Wait the compactor and cleanup to finish case they are running
// This will also set the compactorRunning, as clean up and compact can't happen at the same time
while (!compactorRunning.compareAndSet(false, true))
@@ -1517,8 +1515,6 @@
return;
}
- dataFiles.clear();
-
HashSet<Long> txSet = new HashSet<Long>();
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
@@ -1535,6 +1531,9 @@
Collections.sort(dataFilesToProcess, new JournalFileComparator());
+ // Need to make sure everything is out of executors and on the disk before backing it up
+ flush();
+
// This is where most of the work is done, taking most of the time of the compacting routine.
// Notice there are no locks while this is being done.
@@ -1544,9 +1543,48 @@
{
readJournalFile(fileFactory, file, copier);
}
+
+
+ // Final Freeze.... Sending the left over files (including the next file)
+ globalLock.writeLock().lock();
+
+ try
+ {
+ // Need to make sure everything is out of executors and on the disk before backing it up
+ flush();
+
+ List<JournalFile> newDataFilesToProcess = getSnapshotFilesToProcess();
+ Collections.sort(newDataFilesToProcess, new JournalFileComparator());
+
+ Iterator<JournalFile> newDataIterator = newDataFilesToProcess.iterator();
+ for (JournalFile alreadyProcessed : dataFilesToProcess)
+ {
+ JournalFile newFile = newDataIterator.next();
+
+ if (newFile.getFileID() != alreadyProcessed.getFileID())
+ {
+ log.warn("Processed FileID " + alreadyProcessed.getFileID() + " inconsistent with previous processed " + newFile.getFileID());
+ }
+ }
- copier.flush();
+ while (newDataIterator.hasNext())
+ {
+ JournalFile newFile = newDataIterator.next();
+
+ log.info("processing " + newFile.getFileID());
+
+ readJournalFile(fileFactory, newFile, copier);
+ }
+
+ }
+ finally
+ {
+ globalLock.writeLock().unlock();
+ }
+
+
+
}
finally
{
@@ -1756,6 +1794,8 @@
*/
private List<JournalFile> getSnapshotFilesToProcess() throws Exception
{
+ flush();
+
// We need to move to the next file, as we need a clear start for negatives and positives counts
moveNextFile(true);
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 21:13:03 UTC (rev 8152)
@@ -14,6 +14,11 @@
package org.hornetq.tests.unit.core.journal.impl;
import java.io.File;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -22,6 +27,7 @@
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.utils.VariableLatch;
/**
* A CopyJournalTest
@@ -47,7 +53,7 @@
public void testSimpleCopy() throws Exception
{
- setup(10, 10 * 1024, true);
+ setup(2, 100 * 1024, false);
createJournal();
startJournal();
load();
@@ -59,26 +65,70 @@
addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
}
addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
-
+ for (int i = 0 ; i < 10; i++)
+ {
+ addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ }
+
File destDir = new File(getTestDir()+"/dest");
destDir.mkdirs();
- SequentialFileFactory nioFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
+ SequentialFileFactory newFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
- Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, nioFactory, filePrefix, fileExtension, 1);
+ Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, newFactory, filePrefix, fileExtension, 1);
destJournal.start();
destJournal.loadInternalOnly();
- journal.copyTo(destJournal);
+ CountDownLatch locked = new CountDownLatch(1);
- journal.flush();
+ JournalHandler handler = new JournalHandler(destJournal, locked, 5);
- destJournal.flush();
+ final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{Journal.class}, handler);
+ Thread copier = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ journal.copyTo(proxyJournal);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+ };
+ copier.start();
- System.exit(1);
+ assertTrue(locked.await(10, TimeUnit.SECONDS));
+
+ sequence.set(5000);
+
+ for (int i = 0 ; i < 10 ; i++)
+ {
+ addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ journal.forceMoveNextFile();
+ }
+
+
+
+ handler.unlock();
+
+ copier.join();
+
+ stopJournal();
+
+ destJournal.stop();
+
+ this.fileFactory = newFactory;
+
+ startJournal();
+
+ loadAndCheck(true);
}
@Override
@@ -93,7 +143,7 @@
return new AIOSequentialFileFactory(getTestDir(),
ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
1000000,
- true,
+ false,
false
);
}
@@ -101,7 +151,59 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+
+ /** This handler will lock after N calls, until the Handler is opened again */
+ protected class JournalHandler implements InvocationHandler
+ {
+
+
+ final VariableLatch valve = new VariableLatch();
+
+ final CountDownLatch locked;
+
+ final int executionsBeforeLock;
+
+ int executions = 0;
+
+
+ private final Journal target;
+
+ public JournalHandler(Journal journal, CountDownLatch locked, int executionsBeforeLock)
+ {
+ this.target = journal;
+ this.locked = locked;
+ this.executionsBeforeLock = executionsBeforeLock;
+ }
+
+ private void lock()
+ {
+ valve.up();
+ }
+
+ public void unlock()
+ {
+ valve.down();
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ System.out.println("Invoked " + method.getName());
+ if (executions ++ == executionsBeforeLock)
+ {
+ lock();
+ locked.countDown();
+ }
+ if (!valve.waitCompletion(10000))
+ {
+ throw new IllegalStateException("Timeout waiting for open valve");
+ }
+ return method.invoke(target, args);
+ }
+
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
14 years, 6 months
JBoss hornetq SVN: r8151 - in branches/Clebert_Sync: src/main/org/hornetq/core/journal/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-27 14:08:05 -0400 (Tue, 27 Oct 2009)
New Revision: 8151
Added:
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
Backup changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -78,6 +78,11 @@
// Load
long load(LoaderCallback reloadManager) throws Exception;
+
+ /** Load internal data structures and not expose any data.
+ * This is only useful if you're using the journal but not interested on the current data.
+ * Useful in situations where the journal is being replicated, copied... etc. */
+ void loadInternalOnly() throws Exception;
long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -23,7 +23,7 @@
import org.hornetq.utils.DataConstants;
/**
- * A JournalCopier
+ * This will read records
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -35,6 +35,17 @@
private static final Logger log = Logger.getLogger(JournalCopier.class);
+
+ /** enable some trace at development. */
+ private static final boolean DEV_TRACE = true;
+
+ private static final boolean isTraceEnabled = log.isTraceEnabled();
+
+ private static void trace(String msg)
+ {
+ System.out.println("JournalCopier::" + msg);
+ }
+
// Attributes ----------------------------------------------------
private final Set<Long> pendingTransactions;
@@ -72,14 +83,29 @@
{
if (lookupRecord(info.id))
{
+ if (DEV_TRACE)
+ {
+ trace("Backing add ID = " + info.id);
+ }
journalTo.appendAddRecord(info.id, info.userRecordType, info.data, false);
}
+ else
+ {
+ if (DEV_TRACE)
+ {
+ trace("Ignoring add ID = " + info.id);
+ }
+ }
}
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
if (pendingTransactions.contains(transactionID))
{
+ if (DEV_TRACE)
+ {
+ trace("Backing add TXID = " + transactionID + " id = " + info.id);
+ }
journalTo.appendAddRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
}
else
@@ -132,7 +158,7 @@
if (pendingTransactions.contains(transactionID))
{
// Sanity check, this should never happen
- log.warn("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+ log.warn("Inconsistency during copying: RollbackRecord ID = " + transactionID +
" for an already rolled back transaction during compacting");
}
}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -83,7 +83,7 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = false;
+ private static final boolean trace = log.isTraceEnabled();
/** This is to be set to true at DEBUG & development only */
private static final boolean LOAD_TRACE = false;
@@ -1358,7 +1358,36 @@
{
return fileFactory.getAlignment();
}
+
+ public synchronized void loadInternalOnly() throws Exception
+ {
+ LoaderCallback dummyLoader = new LoaderCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ };
+
+ this.load(dummyLoader);
+ }
+
/**
* @see JournalImpl#load(LoaderCallback)
*/
@@ -1450,6 +1479,7 @@
flushExecutor();
// Wait the compactor and cleanup to finish case they are running
+ // This will also set the compactorRunning, as clean up and compact can't happen at the same time
while (!compactorRunning.compareAndSet(false, true))
{
final CountDownLatch latch = new CountDownLatch(1);
@@ -1515,7 +1545,7 @@
readJournalFile(fileFactory, file, copier);
}
- compactor.flush();
+ copier.flush();
}
finally
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -1263,32 +1263,8 @@
*/
public void loadInternalOnly() throws Exception
{
- LoaderCallback dummyLoader = new LoaderCallback()
- {
-
- public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
- {
- }
-
- public void updateRecord(RecordInfo info)
- {
- }
-
- public void deleteRecord(long id)
- {
- }
-
- public void addRecord(RecordInfo info)
- {
- }
-
- public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
- {
- }
- };
-
- bindingsJournal.load(dummyLoader);
- messageJournal.load(dummyLoader);
+ bindingsJournal.loadInternalOnly();
+ messageJournal.loadInternalOnly();
}
// Public -----------------------------------------------------------------------------------
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -26,7 +26,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
-
/**
* Used by the {@link JournalStorageManager} to replicate journal calls.
*
@@ -52,9 +51,7 @@
private final byte journalID;
- public ReplicatedJournal(final byte journaID,
- final Journal localJournal,
- final ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journaID, final Journal localJournal, final ReplicationManager replicationManager)
{
super();
journalID = journaID;
@@ -62,9 +59,17 @@
this.replicationManager = replicationManager;
}
+ public ReplicatedJournal(final byte journaID, final ReplicationManager replicationManager)
+ {
+ super();
+ journalID = journaID;
+ localJournal = null;
+ this.replicationManager = replicationManager;
+ }
+
// Static --------------------------------------------------------
-
- private static void trace(String message)
+
+ private static void trace(final String message)
{
log.trace(message);
}
@@ -100,7 +105,10 @@
trace("Append record id = " + id + " recordType = " + recordType);
}
replicationManager.appendAddRecord(journalID, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecord(id, recordType, record, sync);
+ }
}
/**
@@ -134,7 +142,10 @@
trace("Append record TXid = " + id + " recordType = " + recordType);
}
replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
- localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ }
}
/**
@@ -150,7 +161,10 @@
trace("AppendCommit " + txID);
}
replicationManager.appendCommitRecord(journalID, txID);
- localJournal.appendCommitRecord(txID, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendCommitRecord(txID, sync);
+ }
}
/**
@@ -166,7 +180,10 @@
trace("AppendDelete " + id);
}
replicationManager.appendDeleteRecord(journalID, id);
- localJournal.appendDeleteRecord(id, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecord(id, sync);
+ }
}
/**
@@ -195,7 +212,10 @@
trace("AppendDelete txID=" + txID + " id=" + id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
- localJournal.appendDeleteRecordTransactional(txID, id, record);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id, record);
+ }
}
/**
@@ -211,7 +231,10 @@
trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
- localJournal.appendDeleteRecordTransactional(txID, id);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id);
+ }
}
/**
@@ -240,7 +263,10 @@
trace("AppendPrepare txID=" + txID);
}
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
- localJournal.appendPrepareRecord(txID, transactionData, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendPrepareRecord(txID, transactionData, sync);
+ }
}
/**
@@ -256,7 +282,10 @@
trace("AppendRollback " + txID);
}
replicationManager.appendRollbackRecord(journalID, txID);
- localJournal.appendRollbackRecord(txID, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendRollbackRecord(txID, sync);
+ }
}
/**
@@ -287,7 +316,10 @@
trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
}
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
- localJournal.appendUpdateRecord(id, recordType, record, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecord(id, recordType, record, sync);
+ }
}
/**
@@ -324,7 +356,10 @@
trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
}
replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
- localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ }
}
/**
@@ -339,7 +374,14 @@
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure) throws Exception
{
- return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
+ if (localJournal != null)
+ {
+ return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
+ }
+ else
+ {
+ return -1;
+ }
}
/**
@@ -350,7 +392,14 @@
*/
public long load(final LoaderCallback reloadManager) throws Exception
{
- return localJournal.load(reloadManager);
+ if (localJournal != null)
+ {
+ return localJournal.load(reloadManager);
+ }
+ else
+ {
+ return -1;
+ }
}
/**
@@ -360,7 +409,10 @@
*/
public void perfBlast(final int pages) throws Exception
{
- localJournal.perfBlast(pages);
+ if (localJournal != null)
+ {
+ localJournal.perfBlast(pages);
+ }
}
/**
@@ -369,7 +421,10 @@
*/
public void start() throws Exception
{
- localJournal.start();
+ if (localJournal != null)
+ {
+ localJournal.start();
+ }
}
/**
@@ -378,7 +433,10 @@
*/
public void stop() throws Exception
{
- localJournal.stop();
+ if (localJournal != null)
+ {
+ localJournal.stop();
+ }
}
/* (non-Javadoc)
@@ -386,7 +444,14 @@
*/
public int getAlignment() throws Exception
{
- return localJournal.getAlignment();
+ if (localJournal != null)
+ {
+ return localJournal.getAlignment();
+ }
+ else
+ {
+ return 1;
+ }
}
/* (non-Javadoc)
@@ -394,13 +459,20 @@
*/
public boolean isStarted()
{
- return localJournal.isStarted();
+ if (localJournal != null)
+ {
+ return localJournal.isStarted();
+ }
+ else
+ {
+ return true;
+ }
}
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
*/
- public void copyTo(Journal destJournal) throws Exception
+ public void copyTo(final Journal destJournal) throws Exception
{
// This would be a nonsense operation. Only the real journal can copyTo
throw new IllegalStateException("Operation Not Implemeted!");
@@ -411,9 +483,19 @@
*/
public void flush() throws Exception
{
- localJournal.flush();
+ if (localJournal != null)
+ {
+ localJournal.flush();
+ }
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+ */
+ public void loadInternalOnly() throws Exception
+ {
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -713,5 +713,12 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+ */
+ public void loadInternalOnly() throws Exception
+ {
+ }
+
}
}
Added: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java (rev 0)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.journal.impl;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+
+/**
+ * A CopyJournalTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CopyJournalTest extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ final AtomicInteger sequence = new AtomicInteger(0);
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSimpleCopy() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+
+
+ for (int i = 0 ; i < 10; i++)
+ {
+ addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ }
+ addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
+ File destDir = new File(getTestDir()+"/dest");
+
+ destDir.mkdirs();
+
+ SequentialFileFactory nioFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
+
+ Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, nioFactory, filePrefix, fileExtension, 1);
+ destJournal.start();
+ destJournal.loadInternalOnly();
+
+ journal.copyTo(destJournal);
+
+ journal.flush();
+
+ destJournal.flush();
+
+
+
+ System.exit(1);
+ }
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(getTestDir(),
+ ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ 1000000,
+ true,
+ false
+ );
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-10-27 18:08:05 UTC (rev 8151)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.journal.impl;
+import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -26,6 +27,7 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TestableJournal;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -465,10 +467,13 @@
protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
{
System.out.println("***********************************************");
- System.out.println("Expected list:");
- for (RecordInfo info : expected)
+ if (expected != null)
{
- System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ System.out.println("Expected list:");
+ for (RecordInfo info : expected)
+ {
+ System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ }
}
if (actual != null)
{
14 years, 6 months
JBoss hornetq SVN: r8150 - trunk/docs/diagrams.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-27 12:16:44 -0400 (Tue, 27 Oct 2009)
New Revision: 8150
Added:
trunk/docs/diagrams/widgets.odg
Log:
Open Office widgets to represent HornetQ components
Added: trunk/docs/diagrams/widgets.odg
===================================================================
(Binary files differ)
Property changes on: trunk/docs/diagrams/widgets.odg
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
14 years, 6 months
JBoss hornetq SVN: r8149 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-27 12:15:44 -0400 (Tue, 27 Oct 2009)
New Revision: 8149
Modified:
trunk/docs/user-manual/en/connection-ttl.xml
Log:
renamed dead connections chapter
Modified: trunk/docs/user-manual/en/connection-ttl.xml
===================================================================
--- trunk/docs/user-manual/en/connection-ttl.xml 2009-10-27 16:14:48 UTC (rev 8148)
+++ trunk/docs/user-manual/en/connection-ttl.xml 2009-10-27 16:15:44 UTC (rev 8149)
@@ -17,11 +17,10 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="connection-ttl">
- <title>Dead Connections and Session Multiplexing</title>
+ <title>Detecting Dead Connections</title>
<para>In this section we will discuss connection time-to-live (TTL) and explain how HornetQ
deals with crashed clients and clients which have exited without cleanly closing their
- resources. We'll also discuss how HornetQ multiplexes several sessions on a single
- connection.</para>
+ resources.</para>
<section id="dead.connections">
<title>Cleaning up Dead Connection Resources on the Server</title>
<para>Before a HornetQ client application exits it is considered good practice that it
14 years, 6 months
JBoss hornetq SVN: r8148 - in trunk/examples/jms/application-layer-failover: server1 and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-27 12:14:48 -0400 (Tue, 27 Oct 2009)
New Revision: 8148
Modified:
trunk/examples/jms/application-layer-failover/server0/hornetq-configuration.xml
trunk/examples/jms/application-layer-failover/server1/hornetq-configuration.xml
Log:
application layer failover example
* no need to declare the servers as clustered
Modified: trunk/examples/jms/application-layer-failover/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/application-layer-failover/server0/hornetq-configuration.xml 2009-10-27 10:45:01 UTC (rev 8147)
+++ trunk/examples/jms/application-layer-failover/server0/hornetq-configuration.xml 2009-10-27 16:14:48 UTC (rev 8148)
@@ -2,8 +2,6 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
- <clustered>true</clustered>
-
<!-- Connectors -->
<connectors>
<connector name="netty-connector">
Modified: trunk/examples/jms/application-layer-failover/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/application-layer-failover/server1/hornetq-configuration.xml 2009-10-27 10:45:01 UTC (rev 8147)
+++ trunk/examples/jms/application-layer-failover/server1/hornetq-configuration.xml 2009-10-27 16:14:48 UTC (rev 8148)
@@ -2,8 +2,6 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
- <clustered>true</clustered>
-
<!-- Connectors -->
<connectors>
14 years, 6 months
JBoss hornetq SVN: r8147 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-27 06:45:01 -0400 (Tue, 27 Oct 2009)
New Revision: 8147
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/connection-ttl.xml
Log:
removed session multiplexing from documentation
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-10-27 10:18:02 UTC (rev 8146)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-10-27 10:45:01 UTC (rev 8147)
@@ -934,13 +934,6 @@
<entry>false</entry>
</row>
<row>
- <entry><link linkend="connection-ttl.session.multiplexing"
- >connection-factory.max-connections</link></entry>
- <entry>Integer</entry>
- <entry>the maximum number of connections per factory</entry>
- <entry>8</entry>
- </row>
- <row>
<entry><link linkend="large-messages.core.config"
>connection-factory.min-large-message-size</link></entry>
<entry>Integer</entry>
Modified: trunk/docs/user-manual/en/connection-ttl.xml
===================================================================
--- trunk/docs/user-manual/en/connection-ttl.xml 2009-10-27 10:18:02 UTC (rev 8146)
+++ trunk/docs/user-manual/en/connection-ttl.xml 2009-10-27 10:45:01 UTC (rev 8147)
@@ -175,29 +175,4 @@
<para>If you do set this parameter to <literal>false</literal> please do so with
caution.</para>
</section>
- <section id="connection-ttl.session.multiplexing">
- <title>Session Multiplexing</title>
- <para>Each <literal>ClientSessionFactory</literal> creates connections on demand to the same
- server as you create sessions. Each instance will create up to a maximum of <literal
- >maxConnections</literal> connections to the same server. Subsequent sessions will
- use one of the already created connections in a round-robin fashion.</para>
- <para>To illustrate this, let's say <literal>maxConnections</literal> is set to <literal
- >8</literal>. The first eight sessions that you create will have a new underlying
- connection created for them, the next eight you create will use one of the previously
- created connections.</para>
- <para>The default value for <literal>maxConnections</literal> is <literal>8</literal>, if
- you prefer you can set it to a lower value so each factory maintains only one underlying
- connection. We choose a default value of <literal>8</literal> because on the server side
- each packet read from a particular connection is read serially by the same thread, so,
- if all traffic from the clients sessions is multiplexed on the same connection it will
- all be processed by the same thread on the server, which might not be a good use of
- cores on the server. By choosing <literal>8</literal> then different sessions traffic
- from the same client can be processed by different cores. If you have many different
- clients then this may not be relevant anyway.</para>
- <para>To change the value of <literal>maxConnections</literal> simply use the setter method
- on the <literal>ClientSessionFactory</literal> immediately after constructing it, or if
- you are using JMS use the setter on the <literal>HornetQConnectionFactory</literal> or
- specify the <literal>max-connections</literal> parameter in the connection factory xml
- configuration in <literal>hornetq-jms.xml</literal>.</para>
- </section>
</chapter>
14 years, 6 months
JBoss hornetq SVN: r8146 - in trunk: examples/jms and 20 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-27 06:18:02 -0400 (Tue, 27 Oct 2009)
New Revision: 8146
Added:
trunk/examples/jms/non-transaction-failover/
trunk/examples/jms/non-transaction-failover/build.bat
trunk/examples/jms/non-transaction-failover/build.sh
trunk/examples/jms/non-transaction-failover/build.xml
trunk/examples/jms/non-transaction-failover/readme.html
trunk/examples/jms/non-transaction-failover/server0/
trunk/examples/jms/non-transaction-failover/server0/client-jndi.properties
trunk/examples/jms/non-transaction-failover/server0/hornetq-beans.xml
trunk/examples/jms/non-transaction-failover/server0/hornetq-configuration.xml
trunk/examples/jms/non-transaction-failover/server0/hornetq-jms.xml
trunk/examples/jms/non-transaction-failover/server0/hornetq-users.xml
trunk/examples/jms/non-transaction-failover/server1/
trunk/examples/jms/non-transaction-failover/server1/KILL_ME
trunk/examples/jms/non-transaction-failover/server1/client-jndi.properties
trunk/examples/jms/non-transaction-failover/server1/hornetq-beans.xml
trunk/examples/jms/non-transaction-failover/server1/hornetq-configuration.xml
trunk/examples/jms/non-transaction-failover/server1/hornetq-jms.xml
trunk/examples/jms/non-transaction-failover/server1/hornetq-users.xml
trunk/examples/jms/non-transaction-failover/src/
trunk/examples/jms/non-transaction-failover/src/org/
trunk/examples/jms/non-transaction-failover/src/org/hornetq/
trunk/examples/jms/non-transaction-failover/src/org/hornetq/jms/
trunk/examples/jms/non-transaction-failover/src/org/hornetq/jms/example/
trunk/examples/jms/non-transaction-failover/src/org/hornetq/jms/example/NonTransactionFailoverExample.java
trunk/examples/jms/transaction-failover/
trunk/examples/jms/transaction-failover/build.bat
trunk/examples/jms/transaction-failover/build.sh
trunk/examples/jms/transaction-failover/build.xml
trunk/examples/jms/transaction-failover/readme.html
trunk/examples/jms/transaction-failover/server0/
trunk/examples/jms/transaction-failover/server0/client-jndi.properties
trunk/examples/jms/transaction-failover/server0/hornetq-beans.xml
trunk/examples/jms/transaction-failover/server0/hornetq-configuration.xml
trunk/examples/jms/transaction-failover/server0/hornetq-jms.xml
trunk/examples/jms/transaction-failover/server0/hornetq-users.xml
trunk/examples/jms/transaction-failover/server1/
trunk/examples/jms/transaction-failover/server1/KILL_ME
trunk/examples/jms/transaction-failover/server1/client-jndi.properties
trunk/examples/jms/transaction-failover/server1/hornetq-beans.xml
trunk/examples/jms/transaction-failover/server1/hornetq-configuration.xml
trunk/examples/jms/transaction-failover/server1/hornetq-jms.xml
trunk/examples/jms/transaction-failover/server1/hornetq-users.xml
trunk/examples/jms/transaction-failover/src/
trunk/examples/jms/transaction-failover/src/org/
trunk/examples/jms/transaction-failover/src/org/hornetq/
trunk/examples/jms/transaction-failover/src/org/hornetq/jms/
trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/
trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java
Modified:
trunk/.classpath
trunk/src/main/org/hornetq/core/client/ClientSession.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
failover examples
* added examples for JMS failover with and without transactions
* refactored ClientSession to flag its workDone when a consumer effectively consume a message
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-10-27 02:24:40 UTC (rev 8145)
+++ trunk/.classpath 2009-10-27 10:18:02 UTC (rev 8146)
@@ -50,6 +50,7 @@
<classpathentry kind="src" path="examples/jms/message-group/src"/>
<classpathentry kind="src" path="examples/jms/message-priority/src"/>
<classpathentry kind="src" path="examples/jms/no-consumer-buffering/src"/>
+ <classpathentry kind="src" path="examples/jms/non-transaction-failover/src"/>
<classpathentry kind="src" path="examples/jms/paging/src"/>
<classpathentry kind="src" path="examples/jms/perf/src"/>
<classpathentry kind="src" path="examples/jms/pre-acknowledge/src"/>
@@ -72,6 +73,7 @@
<classpathentry kind="src" path="examples/jms/topic-hierarchies/src"/>
<classpathentry kind="src" path="examples/jms/topic-selector-example1/src"/>
<classpathentry kind="src" path="examples/jms/topic-selector-example2/src"/>
+ <classpathentry kind="src" path="examples/jms/transaction-failover/src"/>
<classpathentry kind="src" path="examples/jms/transactional/src"/>
<classpathentry kind="src" path="examples/jms/xa-heuristic/src"/>
<classpathentry kind="src" path="examples/jms/xa-receive/src"/>
Property changes on: trunk/examples/jms/non-transaction-failover
___________________________________________________________________
Name: svn:ignore
+ build
Added: trunk/examples/jms/non-transaction-failover/build.bat
===================================================================
--- trunk/examples/jms/non-transaction-failover/build.bat (rev 0)
+++ trunk/examples/jms/non-transaction-failover/build.bat 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: trunk/examples/jms/non-transaction-failover/build.sh
===================================================================
--- trunk/examples/jms/non-transaction-failover/build.sh (rev 0)
+++ trunk/examples/jms/non-transaction-failover/build.sh 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Property changes on: trunk/examples/jms/non-transaction-failover/build.sh
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/examples/jms/non-transaction-failover/build.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/build.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/build.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project default="run" name="HornetQ JMS Non-transaction Failover Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run" depends="delete-files">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.NonTransactionFailoverExample"/>
+ <param name="hornetq.example.beans.file" value="server0 server1"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote" depends="delete-files">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.NonTransactionFailoverExample"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+ <target name="delete-files" depends="clean">
+ <delete file="./server1/KILL_ME"/>
+ </target>
+
+</project>
Added: trunk/examples/jms/non-transaction-failover/readme.html
===================================================================
--- trunk/examples/jms/non-transaction-failover/readme.html (rev 0)
+++ trunk/examples/jms/non-transaction-failover/readme.html 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,166 @@
+<html>
+ <head>
+ <title>HornetQ JMS Failover Without Transactions Example</title>
+ <link rel="stylesheet" type="text/css" href="../../common/common.css" />
+ <link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
+ <script type="text/javascript" src="../../common/prettify.js"></script>
+ </head>
+ <body onload="prettyPrint()">
+ <h1>JMS Failover Without Transactions Example</h1>
+
+ <p>This example demonstrates two servers coupled as a live-backup pair for high availability (HA), and a client
+ connection failing over from live to backup when the live server is crashed.</p>
+ <p>Failover behavior differs wether the JMS session is transacter or not.</p>
+ <p>When a <em>non-transacted</em> JMS session is used, once and only once delivery is not guaranteed
+ and it is possible some messages will be lost or delivered twice, depending when the failover to the backup server occurs.</p>
+ <p>It is up to the client to deal with such cases. To ensure once and only once delivery, the client must
+ use transacted JMS sessions (as shown in the example for <a href="../transaction-failover/readme.html">failover with transactions</a>).</p>
+ <p>For more information on HornetQ failover and HA, and clustering in general, please see the clustering
+ section of the user manual.</p>
+
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
+ <p>In this example, the live server is server 1, and the backup server is server 0</p>
+ <p>The connection will initially be created to server1, server 1 will crash, and the client will carry on
+ seamlessly on server 0, the backup server.</p>
+ <br>
+ <ol>
+ <li>Get an initial context for looking up JNDI from server #1.</li>
+ <pre class="prettyprint">
+ initialContext = getContext(1);
+ </pre>
+
+ <li>Look up the JMS resources from JNDI on server #1.</li>
+ <pre class="prettyprint">
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory connectionFactory = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ </pre>
+
+ <li>Create a JMS Connection</li>
+ <pre class="prettyprint">
+ connection = connectionFactory.createConnection();
+ </pre>
+
+ <li>Create a JMS <em>non-transacted</em> Session with client acknowledgement</li>
+ <pre class="prettyprint">
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ </pre>
+
+ <li>Start the connection to ensure delivery occurs</li>
+ <pre class="prettyprint">
+ connection.start();
+ </pre>
+
+ <li>Create a JMS MessageProducer and MessageConsumer</li>
+ <pre class="prettyprint">
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ </pre>
+
+ <li>Send some messages to server #1</li>
+ <pre class="prettyprint">
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+ </pre>
+
+ <li>Receive and acknowledge half of the sent messages</li>
+ <pre class="prettyprint">
+ TextMessage message0 = null;
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+ message0.acknowledge();
+ </pre>
+
+ <li>Receive the second half of the sent messages but <em>do not acknowledge them yet</em></li>
+ <pre class="prettyprint">
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+ </pre>
+
+ <li>Crash server #1, the live server, and wait a little while to make sure it has really crashed.</li>
+ <pre class="prettyprint">
+ killServer(1);
+ Thread.sleep(2000);
+ </pre>
+
+ <li>Acknowledging the second half of the sent messages will fail as failover to the backup server has occured</li>
+ <pre class="prettyprint">
+ try
+ {
+ message0.acknowledge();
+ }
+ catch (JMSException e)
+ {
+ System.out.println("Got exception while acknowledging message: " + e.getMessage());
+ ...
+ }
+ </pre>
+
+ <li>The client must cope with the failover and recreate the JMS session</li>
+ <pre class="prettyprint">
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ producer = session.createProducer(queue);
+ consumer = session.createConsumer(queue);
+ </pre>
+
+ <li>Consume again the second half of the messages againg. Note that they are not considered as redelivered</li>
+ <pre class="prettyprint">
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ message0 = (TextMessage)consumer.receive(5000);
+ System.out.printf("Got message: %s (redelivered?: %s)\n", message0.getText(), message0.getJMSRedelivered());
+ }
+ message0.acknowledge();
+ </pre>
+
+ <li>Send some more messages</li>
+ <pre class="prettyprint">
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+ </pre>
+
+ <li>Consume them</li>
+ <pre class="prettyprint">
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message.getText());
+ message.acknowledge();
+ }
+ </pre>
+
+ <li>And finally, <strong>always</strong> remember to close your resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre class="prettyprint">
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ }
+ </pre>
+
+ </ol>
+ </body>
+</html>
\ No newline at end of file
Property changes on: trunk/examples/jms/non-transaction-failover/server0
___________________________________________________________________
Name: svn:ignore
+ data
Added: trunk/examples/jms/non-transaction-failover/server0/client-jndi.properties
===================================================================
--- trunk/examples/jms/non-transaction-failover/server0/client-jndi.properties (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server0/client-jndi.properties 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/non-transaction-failover/server0/hornetq-beans.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server0/hornetq-beans.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server0/hornetq-beans.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/jms/non-transaction-failover/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server0/hornetq-configuration.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server0/hornetq-configuration.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,37 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <backup>true</backup>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: trunk/examples/jms/non-transaction-failover/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server0/hornetq-jms.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server0/hornetq-jms.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,18 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
Added: trunk/examples/jms/non-transaction-failover/server0/hornetq-users.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server0/hornetq-users.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server0/hornetq-users.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Property changes on: trunk/examples/jms/non-transaction-failover/server1
___________________________________________________________________
Name: svn:ignore
+ data
Added: trunk/examples/jms/non-transaction-failover/server1/KILL_ME
===================================================================
Added: trunk/examples/jms/non-transaction-failover/server1/client-jndi.properties
===================================================================
--- trunk/examples/jms/non-transaction-failover/server1/client-jndi.properties (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server1/client-jndi.properties 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:2099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/non-transaction-failover/server1/hornetq-beans.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server1/hornetq-beans.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server1/hornetq-beans.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">2099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">2098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/jms/non-transaction-failover/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server1/hornetq-configuration.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server1/hornetq-configuration.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,42 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq ../../../src/schema/hornetq-configuration.xsd">
+ <backup-connector-ref connector-name="backup-connector"/>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </connector>
+
+ <connector name="backup-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: trunk/examples/jms/non-transaction-failover/server1/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server1/hornetq-jms.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server1/hornetq-jms.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,18 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector" backup-connector-name="backup-connector"/>
+
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
Added: trunk/examples/jms/non-transaction-failover/server1/hornetq-users.xml
===================================================================
--- trunk/examples/jms/non-transaction-failover/server1/hornetq-users.xml (rev 0)
+++ trunk/examples/jms/non-transaction-failover/server1/hornetq-users.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: trunk/examples/jms/non-transaction-failover/src/org/hornetq/jms/example/NonTransactionFailoverExample.java
===================================================================
--- trunk/examples/jms/non-transaction-failover/src/org/hornetq/jms/example/NonTransactionFailoverExample.java (rev 0)
+++ trunk/examples/jms/non-transaction-failover/src/org/hornetq/jms/example/NonTransactionFailoverExample.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+
+/**
+ * A simple example that demonstrates failover of the JMS connection from one node to another
+ * when the live server crashes using a JMS <em>non-transacted</em> session.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class NonTransactionFailoverExample extends HornetQExample
+{
+ public static void main(String[] args)
+ {
+ new NonTransactionFailoverExample().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ final int numMessages = 10;
+
+ Connection connection = null;
+
+ InitialContext initialContext = null;
+
+ try
+ {
+ // Step 1. Get an initial context for looking up JNDI from the server #1
+ initialContext = getContext(1);
+
+ // Step 2. Look up the JMS resources from JNDI
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory connectionFactory = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ // Step 3. Create a JMS Connection
+ connection = connectionFactory.createConnection();
+
+ // Step 4. Create a *non-transacted* JMS Session with client acknwoledgement
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Step 5. Start the connection to ensure delivery occurs
+ connection.start();
+
+ // Step 6. Create a JMS MessageProducer and a MessageConsumer
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ // Step 7. Send some messages to server #1, the live server
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+
+ // Step 8. Receive and acknowledge half of the sent messages
+ TextMessage message0 = null;
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+ message0.acknowledge();
+
+ // Step 9. Receive the 2nd half of the sent messages but *do not* acknowledge them yet
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+
+ // Step 10. Crash server #1, the live server, and wait a little while to make sure
+ // it has really crashed
+ killServer(1);
+ Thread.sleep(2000);
+
+ // Step 11. Acknowledging the 2nd half of the sent messages will fail as failover to the
+ // backup server has occured
+ try
+ {
+ message0.acknowledge();
+ }
+ catch (JMSException e)
+ {
+ System.out.println("Got exception while acknowledging message: " + e.getMessage());
+ // Step 12. Close the JMS session and recreate the JMS objects
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ producer = session.createProducer(queue);
+ consumer = session.createConsumer(queue);
+ }
+
+ // Step 13. Consume again the 2nd half of the messages again. Note that they are not considered as redelivered.
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ message0 = (TextMessage)consumer.receive(5000);
+ System.out.printf("Got message: %s (redelivered?: %s)\n", message0.getText(), message0.getJMSRedelivered());
+ }
+ message0.acknowledge();
+
+ // Step 14. Send some more messages
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+
+ // Step 15. Consume them
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message.getText());
+ message.acknowledge();
+ }
+
+ return true;
+ }
+ finally
+ {
+ // Step 16. Be sure to close our resources!
+
+ if (connection != null)
+ {
+ connection.close();
+ }
+
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ }
+ }
+
+}
Property changes on: trunk/examples/jms/transaction-failover
___________________________________________________________________
Name: svn:ignore
+ build
Added: trunk/examples/jms/transaction-failover/build.bat
===================================================================
--- trunk/examples/jms/transaction-failover/build.bat (rev 0)
+++ trunk/examples/jms/transaction-failover/build.bat 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: trunk/examples/jms/transaction-failover/build.sh
===================================================================
--- trunk/examples/jms/transaction-failover/build.sh (rev 0)
+++ trunk/examples/jms/transaction-failover/build.sh 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Property changes on: trunk/examples/jms/transaction-failover/build.sh
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/examples/jms/transaction-failover/build.xml
===================================================================
--- trunk/examples/jms/transaction-failover/build.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/build.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project default="run" name="HornetQ JMS Transaction Failover Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run" depends="delete-files">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.TransactionFailoverExample"/>
+ <param name="hornetq.example.beans.file" value="server0 server1"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote" depends="delete-files">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.TransactionFailoverExample"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+ <target name="delete-files" depends="clean">
+ <delete file="./server1/KILL_ME"/>
+ </target>
+
+</project>
Added: trunk/examples/jms/transaction-failover/readme.html
===================================================================
--- trunk/examples/jms/transaction-failover/readme.html (rev 0)
+++ trunk/examples/jms/transaction-failover/readme.html 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,139 @@
+<html>
+ <head>
+ <title>HornetQ JMS Failover With Transaction Example</title>
+ <link rel="stylesheet" type="text/css" href="../../common/common.css" />
+ <link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
+ <script type="text/javascript" src="../../common/prettify.js"></script>
+ </head>
+ <body onload="prettyPrint()">
+ <h1>JMS Failover With Transaction Example</h1>
+
+ <p>This example demonstrates two servers coupled as a live-backup pair for high availability (HA), and a client
+ connection failing over from live to backup when the live server is crashed.</p>
+ <p>Failover behavior differs wether the JMS session is transacter or not.</p>
+ <p>When a <em>transacted</em> JMS session is used, once-and-only once delivery is guaranteed.</p>
+ <ul>
+ <li>if the failover occurs while there is an in-flight transaction, the transaction will be flagged as <em>rollback only</em> and the JMS client will need to retry the transaction work.</li>
+ <li>if the failover occurs while there is <em>no</em> in-flight transaction, the failover will be transparent to the user.</li>
+ </ul>
+ <p>HornetQ also provides an example for <a href="../non-transactional-failover/readme.html">non-transaction failover</a>.</p>
+ <p>For more information on HornetQ failover and HA, and clustering in general, please see the clustering
+ section of the user manual.</p>
+
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
+ <p>In this example, the live server is server 1, and the backup server is server 0</p>
+ <p>The connection will initially be created to server1, server 1 will crash, and the client will carry on
+ seamlessly on server 0, the backup server.</p>
+ <br>
+ <ol>
+ <li>Get an initial context for looking up JNDI from server #1.</li>
+ <pre class="prettyprint">
+ initialContext = getContext(1);
+ </pre>
+
+ <li>Look up the JMS resources from JNDI on server #1.</li>
+ <pre class="prettyprint">
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory connectionFactory = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ </pre>
+
+ <li>Create a JMS Connection</li>
+ <pre class="prettyprint">
+ connection = connectionFactory.createConnection();
+ </pre>
+
+ <li>Create a JMS <em>transacted</em> Session</li>
+ <pre class="prettyprint">
+ Session session = connection.createSession(true, 0);
+ </pre>
+
+ <li>Start the connection to ensure delivery occurs</li>
+ <pre class="prettyprint">
+ connection.start();
+ </pre>
+
+ <li>Create a JMS MessageProducer</li>
+ <pre class="prettyprint">
+ MessageProducer producer = session.createProducer(queue);
+ </pre>
+
+ <li>Create a JMS MessageConsumer</li>
+ <pre class="prettyprint">
+ MessageConsumer consumer = session.createConsumer(queue);
+ </pre>
+
+ <li>Send some messages to server #1 and commit the session</li>
+ <pre class="prettyprint">
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+ session.commit();
+ </pre>
+
+ <li>Crash server #1, the live server, and wait a little while to make sure
+ it has really crashed.<br />
+ When server #1 crashes, the client automatically detects the failure and automatically
+ fails over from server #1 to server #0 (in your real program you wouldn't need to sleep).
+ </li>
+ <pre class="prettyprint">
+ killServer(1); // This causes the live server to crash
+ Thread.sleep(2000); // Just wait a little while to make sure the live server has really crashed.
+ </pre>
+
+ <li>The client is transparently reconnected to server #0 - the backup server.
+ As no work has been done by the session during the failover, we can <em>transparently</em> consume messages
+ from the session.</li>
+ <pre class="prettyprint">
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+ session.commit();
+ </pre>
+
+ <li>Send some more messages and commit the session</li>
+ <pre class="prettyprint">
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+ session.commit();
+ </pre>
+
+ <li>Consume them</li>
+ <pre class="prettyprint">
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+ session.commit();
+ </pre>
+
+ <li>And finally, <strong>always</strong> remember to close your resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre class="prettyprint">
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ }
+ </pre>
+
+ </ol>
+ </body>
+</html>
\ No newline at end of file
Property changes on: trunk/examples/jms/transaction-failover/server0
___________________________________________________________________
Name: svn:ignore
+ data
Added: trunk/examples/jms/transaction-failover/server0/client-jndi.properties
===================================================================
--- trunk/examples/jms/transaction-failover/server0/client-jndi.properties (rev 0)
+++ trunk/examples/jms/transaction-failover/server0/client-jndi.properties 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/transaction-failover/server0/hornetq-beans.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server0/hornetq-beans.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server0/hornetq-beans.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/jms/transaction-failover/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server0/hornetq-configuration.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server0/hornetq-configuration.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,37 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <backup>true</backup>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: trunk/examples/jms/transaction-failover/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server0/hornetq-jms.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server0/hornetq-jms.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,18 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector"/>
+
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
Added: trunk/examples/jms/transaction-failover/server0/hornetq-users.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server0/hornetq-users.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server0/hornetq-users.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Property changes on: trunk/examples/jms/transaction-failover/server1
___________________________________________________________________
Name: svn:ignore
+ data
Added: trunk/examples/jms/transaction-failover/server1/KILL_ME
===================================================================
Added: trunk/examples/jms/transaction-failover/server1/client-jndi.properties
===================================================================
--- trunk/examples/jms/transaction-failover/server1/client-jndi.properties (rev 0)
+++ trunk/examples/jms/transaction-failover/server1/client-jndi.properties 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:2099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/transaction-failover/server1/hornetq-beans.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server1/hornetq-beans.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server1/hornetq-beans.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">2099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">2098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/jms/transaction-failover/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server1/hornetq-configuration.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server1/hornetq-configuration.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,42 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq ../../../src/schema/hornetq-configuration.xsd">
+ <backup-connector-ref connector-name="backup-connector"/>
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </connector>
+
+ <connector name="backup-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5445" type="Integer"/>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="hornetq.remoting.netty.port" value="5446" type="Integer"/>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: trunk/examples/jms/transaction-failover/server1/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server1/hornetq-jms.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server1/hornetq-jms.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,18 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connector-ref connector-name="netty-connector" backup-connector-name="backup-connector"/>
+
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
Added: trunk/examples/jms/transaction-failover/server1/hornetq-users.xml
===================================================================
--- trunk/examples/jms/transaction-failover/server1/hornetq-users.xml (rev 0)
+++ trunk/examples/jms/transaction-failover/server1/hornetq-users.xml 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java
===================================================================
--- trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java (rev 0)
+++ trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+
+/**
+ * A simple example that demonstrates failover of the JMS connection from one node to another
+ * when the live server crashes using a JMS <em>transacted</em> session.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class TransactionFailoverExample extends HornetQExample
+{
+ public static void main(String[] args)
+ {
+ new TransactionFailoverExample().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ final int numMessages = 10;
+
+ Connection connection = null;
+
+ InitialContext initialContext = null;
+
+ try
+ {
+ // Step 1. Get an initial context for looking up JNDI from the server #1
+ initialContext = getContext(1);
+
+ // Step 2. Look-up the JMS resources from JNDI
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory connectionFactory = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ // Step 3. We create a JMS Connection
+ connection = connectionFactory.createConnection();
+
+ // Step 4. We create a *transacted* JMS Session
+ Session session = connection.createSession(true, 0);
+
+ // Step 5. We start the connection to ensure delivery occurs
+ connection.start();
+
+ // Step 6. We create a JMS MessageProducer
+ MessageProducer producer = session.createProducer(queue);
+
+ // Step 7. We create a JMS MessageConsumer
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ // Step 8. We send some messages to server #1, the live server
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+ session.commit();
+
+ // Step 9. We now cause server #1, the live server to crash, and wait a little while to make sure
+ // it has really crashed
+ killServer(1);
+ Thread.sleep(2000);
+
+ // Step 10. We are now transparently reconnected to server #0, the backup server.
+ // We consume the messages sent before the crash of the live server and commit the session.
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+ session.commit();
+
+ // Step 11. We now send some more messages and commit the session
+ for (int i = numMessages; i < numMessages * 2; i++)
+ {
+ TextMessage message = session.createTextMessage("This is text message " + i);
+ producer.send(message);
+ System.out.println("Sent message: " + message.getText());
+ }
+ session.commit();
+
+ // Step 12. And consume them and commit the session
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage message0 = (TextMessage)consumer.receive(5000);
+ System.out.println("Got message: " + message0.getText());
+ }
+ session.commit();
+
+ return true;
+ }
+ finally
+ {
+ // Step 13. Be sure to close our resources!
+
+ if (connection != null)
+ {
+ System.out.println("CLOSING");
+ connection.close();
+ System.out.println("CLOSED");
+ }
+
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ }
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSession.java 2009-10-27 02:24:40 UTC (rev 8145)
+++ trunk/src/main/org/hornetq/core/client/ClientSession.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -117,6 +117,8 @@
XAResource getXAResource();
+ boolean isRollbackOnly();
+
void commit() throws HornetQException;
void rollback() throws HornetQException;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-27 02:24:40 UTC (rev 8145)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -245,6 +245,8 @@
if (m != null)
{
+ session.workDone();
+
if (m.containsProperty(FORCED_DELIVERY_MESSAGE))
{
Long seq = (Long)m.getProperty(FORCED_DELIVERY_MESSAGE);
@@ -707,6 +709,9 @@
{
return;
}
+
+ session.workDone();
+
// We pull the message from the buffer from inside the Runnable so we can ensure priority
// ordering. If we just added a Runnable with the message to the executor immediately as we get it
// we could not do that
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-27 02:24:40 UTC (rev 8145)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -475,6 +475,11 @@
workDone = false;
}
+ public boolean isRollbackOnly()
+ {
+ return rollbackOnly;
+ }
+
public void rollback() throws HornetQException
{
rollback(false);
@@ -511,6 +516,8 @@
{
start();
}
+
+ rollbackOnly = false;
}
public ClientMessage createClientMessage(final byte type,
@@ -712,8 +719,6 @@
clMessage.setFlowControlSize(message.getRequiredBufferSize());
- workDone();
-
consumer.handleMessage(message.getClientMessage());
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-27 02:24:40 UTC (rev 8145)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -462,6 +462,11 @@
{
session.rollback();
}
+
+ public boolean isRollbackOnly()
+ {
+ return session.isRollbackOnly();
+ }
public void rollback(boolean considerLastMessageAsDelivered) throws HornetQException
{
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2009-10-27 02:24:40 UTC (rev 8145)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -1028,6 +1028,12 @@
{
}
+ public boolean isRollbackOnly()
+ {
+
+ return false;
+ }
+
public void rollback() throws HornetQException
{
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-27 02:24:40 UTC (rev 8145)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-27 10:18:02 UTC (rev 8146)
@@ -222,6 +222,8 @@
fail(session, latch);
+ assertTrue(session.isRollbackOnly());
+
try
{
session.commit();
@@ -271,10 +273,92 @@
session.addFailureListener(new MyListener());
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(i % 2 == 0);
+
+ setBody(i, message);
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ fail(session, latch);
+
+ // committing again should work since didn't send anything since last commit
+
+ assertFalse(session.isRollbackOnly());
+
+ session.commit();
+
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ // Only the persistent messages will survive
+
+ if (i % 2 == 0)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+
+ assertMessageBody(i, message);
+
+ assertEquals(i, message.getProperty("counter"));
+
+ message.acknowledge();
+ }
+ }
+
+ assertNull(consumer.receive(1000));
+
+ session.commit();
+
+ session.close();
+
+ assertEquals(0, sf.numSessions());
+
+ assertEquals(0, sf.numConnections());
+ }
+
+ public void testTransactedMessagesWithConsumerStartedBedoreFailover() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+
+ ClientSession session = sf.createSession(false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements FailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ // create a consumer and start the session before failover
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
ClientProducer producer = session.createProducer(ADDRESS);
final int numMessages = 100;
@@ -290,15 +374,23 @@
producer.send(message);
}
+ // messages will be delivered to the consumer when the session is committed
session.commit();
+ assertFalse(session.isRollbackOnly());
+
fail(session, latch);
- // committing again should work since didn't send anything since last commit
-
session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ consumer = session.createConsumer(ADDRESS);
-
+ session.start();
+
for (int i = 0; i < numMessages; i++)
{
// Only the persistent messages will survive
@@ -327,7 +419,7 @@
assertEquals(0, sf.numConnections());
}
-
+
public void testTransactedMessagesConsumedSoRollback() throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();
@@ -389,6 +481,8 @@
fail(session2, latch);
+ assertTrue(session2.isRollbackOnly());
+
try
{
session2.commit();
@@ -474,6 +568,8 @@
fail(session2, latch);
+ assertFalse(session2.isRollbackOnly());
+
consumer = session2.createConsumer(ADDRESS);
for (int i = numMessages / 2; i < numMessages; i++)
@@ -1637,7 +1733,8 @@
try
{
- session.commit();
+ session.commit();
+ fail("commit succeeded");
}
catch (HornetQException e2)
{
14 years, 6 months