Author: timfox
Date: 2009-12-05 07:10:28 -0500 (Sat, 05 Dec 2009)
New Revision: 8581
Modified:
trunk/docs/user-manual/en/ha.xml
trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
fixed transaction example and some small changes to transaction rollback
Modified: trunk/docs/user-manual/en/ha.xml
===================================================================
--- trunk/docs/user-manual/en/ha.xml 2009-12-05 11:25:58 UTC (rev 8580)
+++ trunk/docs/user-manual/en/ha.xml 2009-12-05 12:10:28 UTC (rev 8581)
@@ -68,8 +68,8 @@
hornetq-configuration.xml</literal>, configure the live server
with
knowledge of its backup server. This is done by
specifying a <literal
backup-connector-ref</literal> element. This element references
a
- connector, also specified on the live server which
specifies how
- to connect to the backup server.</para>
+ connector, also specified on the live server which specifies how
to connect
+ to the backup server.</para>
<para>Here's a snippet from live server's <literal
hornetq-configuration.xml</literal> configured to connect to its
backup
server:</para>
@@ -86,8 +86,8 @@
</connector>
</connectors></programlisting>
<para>Secondly, on the backup server, we flag the server as a
backup and make
- sure it has an acceptor that the live server can connect to. We
also make sure the shared-store paramater is
- set to false:</para>
+ sure it has an acceptor that the live server can connect to. We
also make
+ sure the shared-store paramater is set to false:</para>
<programlisting>
<backup>true</backup>
@@ -104,8 +104,8 @@
<para>For a backup server to function correctly it's also
important that it has
the same set of bridges, predefined queues, cluster connections,
broadcast
groups and discovery groups as defined on the live node. The
easiest way to
- ensure this is to copy the entire server side configuration from
live
- to backup and just make the changes as specified above.
</para>
+ ensure this is to copy the entire server side configuration from
live to
+ backup and just make the changes as specified above.
</para>
</section>
<section>
<title>Synchronizing a Backup Node to a Live
Node</title>
@@ -247,46 +247,52 @@
linkend="examples.non-transaction-failover"/>.</para>
<section id="ha.automatic.failover.noteonreplication">
<title>A Note on Server Replication</title>
- <para>HornetQ does not replicate full server state betwen live and
backup servers.
- When the new session is automatically recreated on the backup it
won't have
- any knowledge of messages already sent or acknowledged in that
session. Any
- inflight sends or acknowledgements at the time of failover might also
be
+ <para>HornetQ does not replicate full server state betwen live and
backup servers.
+ When the new session is automatically recreated on the backup it
won't have any
+ knowledge of messages already sent or acknowledged in that session.
Any
+ in-flight sends or acknowledgements at the time of failover might
also be
lost.</para>
<para>By replicating full server state, theoretically we could
provide a 100%
transparent seamless failover, which would avoid any lost messages
or
acknowledgements, however this comes at a great cost: replicating the
full
- server state (including the queues, session, etc.). This would
require replication of
- the entire server state machine; every operation on the live server
would have
- to replicated on the replica server(s) in the exact same global order
to ensure
- a consistent replica state. This is extremely hard to do in a
performant and
- scalable way, especially when one considers that multiple threads are
changing
- the live server state concurrently.</para>
- <para>Some solutions which provide full state machine replication
use
+ server state (including the queues, session, etc.). This would
require
+ replication of the entire server state machine; every operation on
the live
+ server would have to replicated on the replica server(s) in the exact
same
+ global order to ensure a consistent replica state. This is extremely
hard to do
+ in a performant and scalable way, especially when one considers that
multiple
+ threads are changing the live server state
concurrently.</para>
+ <para>Some messaging systems which provide full state machine
replication use
techniques such as <emphasis role="italic">virtual
synchrony</emphasis>, but
this does not scale well and effectively serializes all operations to
a single
thread, dramatically reducing concurrency.</para>
<para>Other techniques for multi-threaded active replication exist
such as
replicating lock states or replicating thread scheduling but this is
very hard
to achieve at a Java level.</para>
- <para>Consequently it xas decided it was not worth massively
reducing performance and
- concurrency for the sake of 100% transparent failover. Even without
100%
+ <para>Consequently it xas decided it was not worth massively
reducing performance
+ and concurrency for the sake of 100% transparent failover. Even
without 100%
transparent failover, it is simple to guarantee <emphasis
role="italic">once and
- only once</emphasis> delivery, even in the case of failure,
by
- using a combination of duplicate detection and retrying of
transactions. However
- this is not 100% transparent to the client code.</para>
+ only once</emphasis> delivery, even in the case of failure,
by using a
+ combination of duplicate detection and retrying of transactions.
However this is
+ not 100% transparent to the client code.</para>
</section>
<section id="ha.automatic.failover.blockingcalls">
<title>Handling Blocking Calls During Failover</title>
- <para>If the client code is in a blocking call to the server,
waiting for
- a response to continue its execution, when failover occurs, the new
session
- will not have any knowledge of the call that was in progress. This
call might
- otherwise hang for ever, waiting for a response that will never
come.</para>
- <para>To prevent this, HornetQ will unblock any blocking calls that
were in
- progress at the time of failover by making them throw a <literal
+ <para>If the client code is in a blocking call to the server,
waiting for a response
+ to continue its execution, when failover occurs, the new session will
not have
+ any knowledge of the call that was in progress. This call might
otherwise hang
+ for ever, waiting for a response that will never come.</para>
+ <para>To prevent this, HornetQ will unblock any blocking calls that
were in progress
+ at the time of failover by making them throw a <literal
javax.jms.JMSException</literal> (if using JMS), or a
<literal
HornetQException</literal> with error code <literal
HornetQException.UNBLOCKED</literal>. It is up to the client code to catch
this exception and retry any operations if desired.</para>
+ <para>If the method being unblocked is a call to commit(), or
prepare(), then the
+ transaction will be automatically rolled back and HornetQ will throw
a <literal
+ >javax.jms.TransactionRolledBackException</literal> (if
using JMS), or a
+ <literal>HornetQException</literal> with error code
<literal
+ >HornetQException.TRANSACTION_ROLLED_BACK</literal> if
using the core
+ API.</para>
</section>
<section id="ha.automatic.failover.transactions">
<title>Handling Failover With Transactions</title>
@@ -302,15 +308,15 @@
<para>It is up to the user to catch the exception, and perform any
client side local
rollback code as necessary. The user can then just retry the
transactional
operations again on the same session.</para>
- <para>HornetQ ships with a fully functioning example demonstrating
how to do this, please
- see <xref
linkend="examples.transaction-failover"/></para>
+ <para>HornetQ ships with a fully functioning example demonstrating
how to do this,
+ please see <xref
linkend="examples.transaction-failover"/></para>
<para>If failover occurs when a commit call is being executed, the
server, as
previously described, will unblock the call to prevent a hang, since
no response
- will come back. In this case it is not easy for the
- client to determine whether the transaction commit was actually
processed on the
- live server before failure occurred.</para>
+ will come back. In this case it is not easy for the client to
determine whether
+ the transaction commit was actually processed on the live server
before failure
+ occurred.</para>
<para>To remedy this, the client can simply enable duplicate
detection (<xref
- linkend="duplicate-detection"/>) in the transaction,
and retry the
+ linkend="duplicate-detection"/>) in the transaction,
and retry the
transaction operations again after the call is unblocked. If the
transaction had
indeed been committed on the live server successfully before
failover, then when
the transaction is retried, duplicate detection will ensure that any
persistent
@@ -325,14 +331,12 @@
</section>
<section id="ha.automatic.failover.nontransactional">
<title>Handling Failover With Non Transactional
Sessions</title>
- <para>If the session is non transactional, messages or
- acknowledgements can be lost in the event of failover.</para>
+ <para>If the session is non transactional, messages or
acknowledgements can be lost
+ in the event of failover.</para>
<para>If you wish to provide <emphasis
role="italic">once and only once</emphasis>
- delivery guarantees for non transacted sessions too, then make sure
you send
- messages blocking, enabled duplicate detection, and catch unblock
exceptions as
- described in <xref
linkend="ha.automatic.failover.blockingcalls"/></para>
- <para>However bear in mind that sending messages and
acknowledgements blocking will
- incur performance penalties due to the network round trip
involved.</para>
+ delivery guarantees for non transacted sessions too, enabled
duplicate
+ detection, and catch unblock exceptions as described in <xref
+
linkend="ha.automatic.failover.blockingcalls"/></para>
</section>
</section>
<section>
@@ -365,8 +369,8 @@
server.</para>
<para>For a working example of application-level failover, please see
<xref
linkend="application-level-failover"/>.</para>
- <para>If you are using the core API, then the procedure is very
similar: you would set
- a <literal>FailureListener</literal> on the core
<literal>ClientSession</literal>
+ <para>If you are using the core API, then the procedure is very
similar: you would set a
+ <literal>FailureListener</literal> on the core
<literal>ClientSession</literal>
instances.</para>
</section>
</section>
Modified:
trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java
===================================================================
---
trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java 2009-12-05
11:25:58 UTC (rev 8580)
+++
trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java 2009-12-05
12:10:28 UTC (rev 8581)
@@ -14,6 +14,7 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -23,6 +24,7 @@
import javax.naming.InitialContext;
import org.hornetq.common.example.HornetQExample;
+import org.hornetq.core.message.impl.MessageImpl;
/**
* A simple example that demonstrates failover of the JMS connection from one node to
another
@@ -59,7 +61,7 @@
// Step 4. We create a *transacted* JMS Session
Session session = connection.createSession(true, 0);
-
+
// Step 5. We start the connection to ensure delivery occurs
connection.start();
@@ -76,25 +78,37 @@
try
{
session.commit();
- } catch (TransactionRolledBackException e)
+ }
+ catch (TransactionRolledBackException e)
{
System.err.println("transaction has been rolled back: " +
e.getMessage());
}
// Step 10. We resend all the messages
sendMessages(session, producer, numMessages, false);
- // Step 11. We commit the session succesfully: the messages will be all
delivered to the activated backup server
+
+ // Step 11. We commit the session succesfully: the messages will be all
delivered to the activated backup
+ // server
session.commit();
-
// Step 12. We are now transparently reconnected to server #0, the backup
server.
// We consume the messages sent before the crash of the live server and commit
the session.
for (int i = 0; i < numMessages; i++)
{
TextMessage message0 = (TextMessage)consumer.receive(5000);
+
+ if (message0 == null)
+ {
+ System.err.println("Example failed - message wasn't
received");
+
+ return false;
+ }
+
System.out.println("Got message: " + message0.getText());
}
+
session.commit();
+
System.out.println("Other message on the server? " +
consumer.receive(5000));
return true;
@@ -121,19 +135,36 @@
for (int i = 0; i < numMessages / 2; i++)
{
TextMessage message = session.createTextMessage("This is text message
" + i);
+
+ //We set the duplicate detection header - so the server will ignore the same
message
+ //if sent again after failover
+
+ message.setStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID.toString(),
"uniqueid" + i);
+
producer.send(message);
+
System.out.println("Sent message: " + message.getText());
}
+
if (killServer)
{
killServer(1);
+
Thread.sleep(2000);
}
+
// We send the remaining half of messages
for (int i = numMessages / 2; i < numMessages; i++)
{
TextMessage message = session.createTextMessage("This is text message
" + i);
+
+ //We set the duplicate detection header - so the server will ignore the same
message
+ //if sent again after failover
+
+ message.setStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID.toString(),
"uniqueid" + i);
+
producer.send(message);
+
System.out.println("Sent message: " + message.getText());
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-05 11:25:58
UTC (rev 8580)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-05 12:10:28
UTC (rev 8581)
@@ -454,6 +454,14 @@
{
return this;
}
+
+ private void rollbackOnFailover() throws HornetQException
+ {
+ rollback(false);
+
+ throw new HornetQException(TRANSACTION_ROLLED_BACK,
+ "The transaction was rolled back on failover to a
backup server");
+ }
public void commit() throws HornetQException
{
@@ -461,15 +469,25 @@
if (rollbackOnly)
{
- rollback(false);
-
- throw new HornetQException(TRANSACTION_ROLLED_BACK,
- "The transaction was rolled back on failover to
a backup server");
+ rollbackOnFailover();
}
flushAcks();
- channel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT));
+ try
+ {
+ channel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT));
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ //The call to commit was unlocked on failover, we therefore rollback the tx,
+ //and throw a transaction rolled back exception instead
+
+ rollbackOnFailover();
+ }
+ }
workDone = false;
}
@@ -732,8 +750,7 @@
public void close() throws HornetQException
{
if (closed)
- {
- log.info("Already closed so not closing");
+ {
return;
}
@@ -990,6 +1007,8 @@
if (rollbackOnly)
{
+ rollback(xid);
+
throw new XAException(XAException.XA_RBOTHER);
}
@@ -1012,6 +1031,23 @@
catch (HornetQException e)
{
log.warn(e.getMessage(), e);
+
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ //Unblocked on failover
+
+ try
+ {
+ rollback(false);
+ }
+ catch (HornetQException e2)
+ {
+ throw new XAException(XAException.XAER_RMERR);
+ }
+
+ throw new XAException(XAException.XA_RBOTHER);
+ }
+
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
@@ -1122,7 +1158,7 @@
public int prepare(final Xid xid) throws XAException
{
checkXA();
-
+
if (rollbackOnly)
{
throw new XAException(XAException.XA_RBOTHER);
@@ -1148,6 +1184,24 @@
}
catch (HornetQException e)
{
+ log.warn(e.getMessage(), e);
+
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ //Unblocked on failover
+
+ try
+ {
+ rollback(false);
+ }
+ catch (HornetQException e2)
+ {
+ throw new XAException(XAException.XAER_RMERR);
+ }
+
+ throw new XAException(XAException.XA_RBOTHER);
+ }
+
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java 2009-12-05
11:25:58 UTC (rev 8580)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor2.java 2009-12-05
12:10:28 UTC (rev 8581)
@@ -31,11 +31,15 @@
{
private static final Logger log = Logger.getLogger(DelayInterceptor2.class);
+ private volatile boolean loseResponse = true;
+
public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
{
- if (packet.getType() == PacketImpl.NULL_RESPONSE)
+ if (packet.getType() == PacketImpl.NULL_RESPONSE && loseResponse)
{
- //Lose the response from the commit
+ //Lose the response from the commit - only lose the first one
+
+ loseResponse = false;
return false;
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-12-05
11:25:58 UTC (rev 8580)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-12-05
12:10:28 UTC (rev 8581)
@@ -44,7 +44,6 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.SimpleString;
/**
@@ -1952,30 +1951,32 @@
{
sf.addInterceptor(interceptor);
+ log.info("attempting commit");
session.commit();
}
catch (HornetQException e)
{
- if (e.getCode() == HornetQException.UNBLOCKED)
+ if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
{
+ log.info("got transaction rolled back");
+
// Ok - now we retry the commit after removing the interceptor
sf.removeInterceptor(interceptor);
try
{
+ log.info("trying to commit again");
session.commit();
- fail("commit succeeded");
+ log.info("committed again ok");
+
+ failed = false;
}
catch (HornetQException e2)
{
- if (e2.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
- {
- // Ok
-
- failed = false;
- }
+
}
+
}
}
}
@@ -1990,6 +1991,8 @@
Thread.sleep(500);
fail(session, latch);
+
+ log.info("connection has failed");
committer.join();
@@ -2104,7 +2107,7 @@
}
catch (HornetQException e)
{
- if (e.getCode() == HornetQException.UNBLOCKED)
+ if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
{
// Ok - now we retry the commit after removing the interceptor
@@ -2113,15 +2116,11 @@
try
{
session.commit();
+
+ failed = false;
}
catch (HornetQException e2)
- {
- if (e2.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
- {
- // Ok
-
- failed = false;
- }
+ {
}
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-05
11:25:58 UTC (rev 8580)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-05
12:10:28 UTC (rev 8581)
@@ -1159,7 +1159,7 @@
protected int getNumIterations()
{
- return 5;
+ return 200000;
}
@Override