JBoss hornetq SVN: r8084 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-13 06:13:26 -0400 (Tue, 13 Oct 2009)
New Revision: 8084
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
tweak
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-13 10:13:26 UTC (rev 8084)
@@ -40,7 +40,6 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -49,11 +48,11 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
14 years, 7 months
JBoss hornetq SVN: r8083 - in trunk: docs/user-manual/en and 28 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-13 06:06:28 -0400 (Tue, 13 Oct 2009)
New Revision: 8083
Modified:
trunk/docs/user-manual/en/client-reconnection.xml
trunk/pom.xml
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/JMSExceptionHelper.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
various fixes, failover changes, fixed tests etc
Modified: trunk/docs/user-manual/en/client-reconnection.xml
===================================================================
--- trunk/docs/user-manual/en/client-reconnection.xml 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/docs/user-manual/en/client-reconnection.xml 2009-10-13 10:06:28 UTC (rev 8083)
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
-
<!-- ============================================================================= -->
<!-- Copyright © 2009 Red Hat, Inc. and others. -->
<!-- -->
@@ -17,14 +16,31 @@
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent -->
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
-
<chapter id="client-reconnection">
<title>Client Reconnection</title>
- <para>HornetQ clients can be configured to automatically reconnect to the server in the
- event that a failure is detected in the connection between the client and the server. If the
- client successfully reconnects, and the server still has a record of the clients session
- (i.e. the server was not restarted) then the client will transparently re-attach to the
- session and it will be able to resume as if nothing had happened.</para>
+ <para>HornetQ clients can be configured to automatically reconnect to the server in the event
+ that a failure is detected in the connection between the client and the server. </para>
+ <para>By default, when a client connection reconnects, HornetQ will automatically recreate any
+ sessions and consumers on the server. If a particular session is transacted and messages
+ have already been sent or acknowledged in the current transaction but not committed yet,
+ then the transaction will be marked as rollback only. This is because HornetQ cannot
+ guarantee that those messages or acks have really reached the server because of the
+ connection failure. In this case, any subsequent attempt to commit the transaction will
+ throw an exception. This exception can be caught and the transaction can be retried.</para>
+ <para>If you are using the core API, the exception thrown will be instance of HornetQException
+ with the error code TRANSACTION_ROLLED_BACK. If you are using the JMS API, the exception
+ will be a javax.jms.TransactionRolledBackException. </para>
+ <para>For a transacted session if a connection failure occurred during the call to commit(),
+ it's not possible for the client to determine if the commit was successfully processed on
+ the server before failure. In this case, if the transaction is retried after reconnection,
+ be sure to use <link linkend="duplicate-detection">duplicate detection</link> in your messages to prevent them being processed more
+ than once. </para>
+ <para>For a non transacted session, after the sessions and consumers have been recreated,
+ messages or acknowledgements that were in transit at the time of the failure might have been
+ lost. This could result in lost sent messages or duplicate delivery of messages. If you want
+ guaranteed once and only once message delivery on failure, you need to use transacted
+ session with duplicate detection.</para>
+ <para>Reattach - TODO</para>
<para>Client reconnection is also used internally by components such as core bridges to allow
them to reconnect to their target servers.</para>
<para>Client reconnection is configured using the following parameters:</para>
@@ -51,11 +67,22 @@
spaced at equal intervals.</para>
</listitem>
<listitem>
+ <para><literal>max-retry-interval</literal>. This optional parameter determines the
+ maximum retry interval that will be used. When setting <literal
+ >retry-interval-multiplier</literal> it would otherwise be possible that
+ subsequent retries exponentially increase to ridiculously large values. By setting
+ this parameter you can set an upper limit on that value. The default value is
+ <literal>2000</literal> milliseconds.</para>
+ </listitem>
+ <listitem>
<para><literal>reconnect-attempts</literal>. This optional parameter determines the
total number of reconnect attempts the bridge will make before giving up and
shutting down. A value of <literal>-1</literal> signifies an unlimited number of
attempts. The default value is <literal>-1</literal>.</para>
</listitem>
+ <listitem>
+ <para><literal>use-reattach</literal>. </para>
+ </listitem>
</itemizedlist>
<para>If you're using JMS, and you're using the JMS Service on the server to load your JMS
connection factory instances directly into JNDI, then you can specify these parameters in
@@ -69,6 +96,7 @@
</entries>
<retry-interval>1000</retry-interval>
<retry-interval-multiplier>1.5</retry-interval-multiplier>
+<max-retry-interval>60000</max-retry-interval>
<reconnect-attempts>1000</reconnect-attempts>
</connection-factory>
</programlisting>
@@ -76,8 +104,8 @@
specify the parameters using the appropriate setter methods on the <literal
>HornetQConnectionFactory</literal> immediately after creating it.</para>
<para>If you're using the core API and instantiating the <literal>ClientSessionFactory</literal>
- instance directly you can also specify the parameters using the appropriate setter methods on the
- <literal>ClientSessionFactory</literal> immediately after creating it.</para>
+ instance directly you can also specify the parameters using the appropriate setter methods
+ on the <literal>ClientSessionFactory</literal> immediately after creating it.</para>
<para>If your client does manage to reconnect but the session is no longer available on the
server, for instance if the server has been restarted or it has timed out, then the client
won't be able to re-attach, and any <literal>ExceptionListener</literal> or <literal
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/pom.xml 2009-10-13 10:06:28 UTC (rev 8083)
@@ -222,7 +222,7 @@
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
- <version>3.1.3.GA</version>
+ <version>3.1.5.GA</version>
</dependency>
<!--needed to compile the logging jar-->
<dependency>
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2009-10-13 10:06:28 UTC (rev 8083)
@@ -102,9 +102,6 @@
<xsd:element name="reconnect-attempts" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
- <xsd:element name="use-reattach" type="xsd:boolean"
- maxOccurs="1" minOccurs="0">
- </xsd:element>
<xsd:element name="failover-on-server-shutdown" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -146,10 +146,6 @@
void setReconnectAttempts(int reconnectAttempts);
- boolean isUseReattach();
-
- void setUseReattach(boolean reattach);
-
boolean isFailoverOnServerShutdown();
void setFailoverOnServerShutdown(boolean failoverOnServerShutdown);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -103,8 +103,6 @@
public static final int DEFAULT_RECONNECT_ATTEMPTS = 0;
- public static final boolean DEFAULT_USE_REATTACH = false;
-
public static final boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false;
public static final boolean DEFAULT_USE_GLOBAL_POOLS = true;
@@ -192,8 +190,6 @@
private int reconnectAttempts;
- private boolean useReattach;
-
private volatile boolean closed;
private boolean failoverOnServerShutdown;
@@ -290,8 +286,7 @@
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
- reconnectAttempts,
- useReattach,
+ reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors);
@@ -363,8 +358,6 @@
reconnectAttempts = DEFAULT_RECONNECT_ATTEMPTS;
- useReattach = DEFAULT_USE_REATTACH;
-
failoverOnServerShutdown = DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
}
@@ -666,17 +659,6 @@
this.reconnectAttempts = reconnectAttempts;
}
- public synchronized boolean isUseReattach()
- {
- return useReattach;
- }
-
- public synchronized void setUseReattach(boolean reattach)
- {
- checkWrite();
- this.useReattach = reattach;
- }
-
public synchronized boolean isFailoverOnServerShutdown()
{
return failoverOnServerShutdown;
@@ -938,8 +920,7 @@
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
- reconnectAttempts,
- useReattach,
+ reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -371,9 +371,8 @@
{
return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, browseOnly);
}
-
- public ClientConsumer createConsumer(final SimpleString queueName,
- final boolean browseOnly) throws HornetQException
+
+ public ClientConsumer createConsumer(final SimpleString queueName, final boolean browseOnly) throws HornetQException
{
return createConsumer(queueName, null, consumerWindowSize, consumerMaxRate, browseOnly);
}
@@ -382,7 +381,7 @@
{
return createConsumer(toSimpleString(queueName), toSimpleString(filterString), browseOnly);
}
-
+
public ClientConsumer createConsumer(final String queueName, final boolean browseOnly) throws HornetQException
{
return createConsumer(toSimpleString(queueName), null, browseOnly);
@@ -580,7 +579,7 @@
if (!started)
{
for (ClientConsumerInternal clientConsumerInternal : consumers.values())
- {
+ {
clientConsumerInternal.start();
}
@@ -691,7 +690,7 @@
public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException
{
- consumers.remove(consumer.getID());
+ consumers.remove(consumer.getID());
}
public void removeProducer(final ClientProducerInternal producer)
@@ -786,17 +785,14 @@
sendAckHandler = handler;
}
- // Needs to be synchronized to prevent issues with occurring concurrently with close()
-
- //TODO - need to reenable
- public synchronized boolean handleReattach(final RemotingConnection backupConnection)
+ // Needs to be synchronized to prevent issues with occurring concurrently with close()
+
+ public synchronized void handleFailover(final RemotingConnection backupConnection)
{
if (closed)
{
- return true;
+ return;
}
-
- boolean ok = false;
// We lock the channel to prevent any packets to be added to the resend
// cache during the failover process
@@ -805,7 +801,7 @@
try
{
channel.transferConnection(backupConnection);
-
+
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
@@ -816,115 +812,44 @@
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
- if (response.isSessionFound())
- {
- channel.replayCommands(response.getLastReceivedCommandID(), channel.getID());
+ if (response.isReattached())
+ {
+ // The session was found on the server - we reattached transparently ok
- ok = true;
+ channel.replayCommands(response.getLastReceivedCommandID(), channel.getID());
}
else
{
- if (closedSent)
- {
- // a session re-attach may fail, if the session close was sent before failover started, hit the server,
- // processed, then before the response was received back, failover occurred, re-attach was attempted. in
- // this case it's ok - we don't want to call any failure listeners and we don't want to halt the rest of
- // the failover process.
- //
- // however if session re-attach fails and the session was not in a call to close, then we DO want to call
- // the session listeners so we return false
- //
- // Also session reattach will fail if the server is restarted - so the session is lost
- ok = true;
- }
- else
- {
- log.warn(System.identityHashCode(this) + " Session not found on server when attempting to re-attach");
- }
+ // The session wasn't found on the server - probably we're failing over onto a backup server where the
+ // session
+ // won't exist or the target server has been restarted - in this case the session will need to be recreated,
+ // and we'll need to recreate any consumers
+
+ Packet createRequest = new CreateSessionMessage(name,
+ channel.getID(),
+ version,
+ username,
+ password,
+ minLargeMessageSize,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ producerWindowSize);
- channel.returnBlocking();
- }
-
- }
- catch (Throwable t)
- {
- log.error("Failed to handle failover", t);
- }
- finally
- {
- channel.unlock();
- }
-
- return ok;
- }
-
- public void workDone()
- {
- workDone = true;
- }
-
- // Needs to be synchronized to prevent issues with occurring concurrently with close()
- public synchronized boolean handleFailover(final RemotingConnection backupConnection)
- {
- if (closed)
- {
- return true;
- }
-
- boolean ok = false;
+ channel1.sendBlocking(createRequest);
- // Need to stop all consumers outside the lock
- for (ClientConsumerInternal consumer : consumers.values())
- {
- try
- {
- consumer.stop();
- }
- catch (HornetQException e)
- {
- log.error("Failed to stop consumer", e);
- }
-
- consumer.clearAtFailover();
- }
-
- // We lock the channel to prevent any packets being sent during the failover process
- channel.lock();
-
- try
- {
- channel.transferConnection(backupConnection);
-
- remotingConnection = backupConnection;
-
- Packet request = new CreateSessionMessage(name,
- channel.getID(),
- version,
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- producerWindowSize);
-
- Channel channel1 = backupConnection.getChannel(1, -1, false);
-
- CreateSessionResponseMessage response = (CreateSessionResponseMessage)channel1.sendBlocking(request);
-
- if (response.isCreated())
- {
- // Session was created ok
-
- // Now we need to recreate the consumers
-
+ channel.clearCommands();
+
for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
{
SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
- entry.getValue().getQueueName(),
- entry.getValue().getFilterString(),
- entry.getValue().isBrowseOnly(),
+ entry.getValue()
+ .getQueueName(),
+ entry.getValue()
+ .getFilterString(),
+ entry.getValue()
+ .isBrowseOnly(),
false);
createConsumerRequest.setChannelID(channel.getID());
@@ -936,23 +861,24 @@
createConsumerRequest.encode(buffer);
conn.write(buffer, false);
-
+
int clientWindowSize = calcWindowSize(entry.getValue().getClientWindowSize());
-
+
if (clientWindowSize != 0)
{
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(), clientWindowSize);
-
+ SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+ clientWindowSize);
+
packet.setChannelID(channel.getID());
-
+
buffer = conn.createBuffer(packet.getRequiredBufferSize());
packet.encode(buffer);
- conn.write(buffer, false);
+ conn.write(buffer, false);
}
}
-
+
if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
// Session is transacted - set for rollback only
@@ -968,30 +894,23 @@
{
consumer.start();
}
-
+
Packet packet = new PacketImpl(PacketImpl.SESS_START);
-
+
packet.setChannelID(channel.getID());
-
+
Connection conn = channel.getConnection().getTransportConnection();
-
+
HornetQBuffer buffer = conn.createBuffer(packet.getRequiredBufferSize());
packet.encode(buffer);
- conn.write(buffer, false);
+ conn.write(buffer, false);
}
-
- ok = true;
+
+ channel.returnBlocking();
}
- else
- {
- // This means the server we failed onto is not ready to take new sessions - perhaps it hasn't actually
- // failed over
- }
- // We cause any blocking calls to return - since they won't get responses.
- channel.returnBlocking();
}
catch (Throwable t)
{
@@ -1001,10 +920,13 @@
{
channel.unlock();
}
-
- return ok;
}
+ public void workDone()
+ {
+ workDone = true;
+ }
+
public void returnBlocking()
{
channel.returnBlocking();
@@ -1033,7 +955,7 @@
public void commit(final Xid xid, final boolean onePhase) throws XAException
{
checkXA();
-
+
if (rollbackOnly)
{
throw new XAException(XAException.XA_RBOTHER);
@@ -1049,7 +971,7 @@
SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
workDone = false;
-
+
if (response.isError())
{
throw new XAException(response.getResponseCode());
@@ -1065,12 +987,12 @@
public void end(final Xid xid, final int flags) throws XAException
{
checkXA();
-
+
if (rollbackOnly)
{
throw new XAException(XAException.XA_RBOTHER);
}
-
+
try
{
Packet packet;
@@ -1391,10 +1313,10 @@
{
throw new IllegalArgumentException("Invalid window size " + windowSize);
}
-
+
return clientWindowSize;
}
-
+
/**
* @param queueName
* @param filterString
@@ -1410,10 +1332,14 @@
final boolean browseOnly) throws HornetQException
{
checkClosed();
-
+
long consumerID = idGenerator.generateID();
- SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
+ SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID,
+ queueName,
+ filterString,
+ browseOnly,
+ true);
channel.sendBlocking(request);
@@ -1422,7 +1348,7 @@
// The value we send is just a hint
int clientWindowSize = calcWindowSize(windowSize);
-
+
ClientConsumerInternal consumer = new ClientConsumerImpl(this,
consumerID,
queueName,
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -53,9 +53,9 @@
void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage continuation) throws Exception;
- boolean handleFailover(RemotingConnection backupConnection);
+ void handleFailover(RemotingConnection backupConnection);
- boolean handleReattach(RemotingConnection backupConnection);
+ // boolean handleReattach(RemotingConnection backupConnection);
RemotingConnection getConnection();
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -368,16 +368,11 @@
return session.getXAResource();
}
- public boolean handleFailover(RemotingConnection backupConnection)
+ public void handleFailover(RemotingConnection backupConnection)
{
- return session.handleFailover(backupConnection);
+ session.handleFailover(backupConnection);
}
- public boolean handleReattach(RemotingConnection backupConnection)
- {
- return session.handleReattach(backupConnection);
- }
-
public void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage continuation) throws Exception
{
session.handleReceiveContinuation(consumerID, continuation);
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -135,8 +135,6 @@
private final List<Interceptor> interceptors;
- private final boolean useReattach;
-
// Static
// ---------------------------------------------------------------------------------------
@@ -153,8 +151,7 @@
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
- final int reconnectAttempts,
- final boolean useReattach,
+ final int reconnectAttempts,
final ExecutorService threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> interceptors)
@@ -198,8 +195,6 @@
this.reconnectAttempts = reconnectAttempts;
- this.useReattach = useReattach;
-
this.scheduledThreadPool = scheduledThreadPool;
this.threadPool = threadPool;
@@ -487,8 +482,6 @@
private void failoverOrReconnect(final Object connectionID, final HornetQException me)
{
- boolean done = false;
-
synchronized (failoverLock)
{
if (connection == null || connection.getID() != connectionID)
@@ -610,23 +603,14 @@
backupTransportParams = null;
- done = reattachSessions(reconnectAttempts == -1 ? -1 : reconnectAttempts + 1, false);
+ reconnectSessions(reconnectAttempts == -1 ? -1 : reconnectAttempts + 1);
}
else
{
- done = reattachSessions(reconnectAttempts, useReattach);
+ reconnectSessions(reconnectAttempts);
}
-
- if (done)
- {
- // Destroy the old connection
-
- oldConnection.destroy();
- }
- else
- {
- oldConnection.destroy();
- }
+
+ oldConnection.destroy();
}
else
{
@@ -663,7 +647,7 @@
/*
* Re-attach sessions all pre-existing sessions to the new remoting connection
*/
- private boolean reattachSessions(final int reconnectAttempts, final boolean reattach)
+ private void reconnectSessions(final int reconnectAttempts)
{
RemotingConnection backupConnection = getConnectionWithRetry(reconnectAttempts);
@@ -671,7 +655,7 @@
{
log.warn("Failed to connect to server.");
- return false;
+ return;
}
List<FailureListener> oldListeners = connection.getFailureListeners();
@@ -689,33 +673,11 @@
}
backupConnection.setFailureListeners(newListeners);
-
- boolean ok = true;
-
- // If all connections got ok, then handle failover
+
for (ClientSessionInternal session : sessions)
{
- boolean b;
-
- if (reattach)
- {
- b = session.handleReattach(backupConnection);
- }
- else
- {
- b = session.handleFailover(backupConnection);
- }
-
- if (!b)
- {
- // If a session fails to re-attach we doom the lot, but we make sure we try all sessions and don't exit
- // early
- // or connections might be left lying around
- ok = false;
- }
+ session.handleFailover(backupConnection);
}
-
- return ok;
}
private RemotingConnection getConnectionWithRetry(final int reconnectAttempts)
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -723,8 +723,6 @@
}
}
- log.info("redistribute called res is " + res);
-
return res;
}
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -60,4 +60,6 @@
void flushConfirmations();
void handlePacket(Packet packet);
+
+ void clearCommands();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -75,7 +75,7 @@
private final int confWindowSize;
- private final Semaphore sendSemaphore;
+ private volatile Semaphore sendSemaphore;
private int receivedBytes;
@@ -420,7 +420,7 @@
if (receivedBytes != 0)
{
receivedBytes = 0;
-
+
final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
confirmed.setChannelID(id);
@@ -441,6 +441,7 @@
{
receivedBytes = 0;
+
final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
confirmed.setChannelID(id);
@@ -449,6 +450,28 @@
}
}
}
+
+ public void clearCommands()
+ {
+ lastReceivedCommandID = -1;
+
+ firstStoredCommandID = 0;
+
+ resendCache.clear();
+
+ Semaphore oldSemaphore = sendSemaphore;
+
+ if (oldSemaphore != null)
+ {
+ //Reset the semaphore
+ sendSemaphore = new Semaphore(windowSize, true);
+
+ //Any threads blocking on the send semaphore should be allowed to return - we do this by just giving it
+ //a lot of permits - note we don't give it Integer.MAX_VALUE since then if if more releases come in that
+ //could end up with permit count going -ve which would cause subsequent sends to block
+ oldSemaphore.release(Integer.MAX_VALUE / 2);
+ }
+ }
public void handlePacket(final Packet packet)
{
@@ -520,11 +543,12 @@
if (packet == null)
{
- throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
+ log.warn("Can't find packet to clear: " +
" last received command id " +
lastReceivedCommandID +
" first stored command id " +
firstStoredCommandID);
+ return;
}
if (packet.getType() != PACKETS_CONFIRMED)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -30,18 +30,14 @@
private int serverVersion;
- private boolean created;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public CreateSessionResponseMessage(final boolean created, final int serverVersion)
+ public CreateSessionResponseMessage(final int serverVersion)
{
super(CREATESESSION_RESP);
- this.created = created;
-
this.serverVersion = serverVersion;
}
@@ -52,11 +48,6 @@
// Public --------------------------------------------------------
- public boolean isCreated()
- {
- return created;
- }
-
@Override
public boolean isResponse()
{
@@ -70,21 +61,19 @@
@Override
public void encodeBody(final HornetQBuffer buffer)
- {
- buffer.writeBoolean(created);
+ {
buffer.writeInt(serverVersion);
}
@Override
public void decodeBody(final HornetQBuffer buffer)
- {
- created = buffer.readBoolean();
+ {
serverVersion = buffer.readInt();
}
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_INT;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT;
}
@Override
@@ -97,7 +86,7 @@
CreateSessionResponseMessage r = (CreateSessionResponseMessage)other;
- boolean matches = super.equals(other) && serverVersion == r.serverVersion && created == r.created;
+ boolean matches = super.equals(other) && serverVersion == r.serverVersion;
return matches;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -31,19 +31,19 @@
private int lastReceivedCommandID;
- private boolean sessionFound;
+ private boolean reattached;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReattachSessionResponseMessage(final int lastReceivedCommandID, final boolean sessionFound)
+ public ReattachSessionResponseMessage(final int lastReceivedCommandID, final boolean reattached)
{
super(REATTACH_SESSION_RESP);
this.lastReceivedCommandID = lastReceivedCommandID;
- this.sessionFound = sessionFound;
+ this.reattached = reattached;
}
public ReattachSessionResponseMessage()
@@ -58,9 +58,9 @@
return lastReceivedCommandID;
}
- public boolean isSessionFound()
+ public boolean isReattached()
{
- return sessionFound;
+ return reattached;
}
public int getRequiredBufferSize()
@@ -68,17 +68,16 @@
return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + DataConstants.SIZE_BOOLEAN;
}
-
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeInt(lastReceivedCommandID);
- buffer.writeBoolean(sessionFound);
+ buffer.writeBoolean(reattached);
}
public void decodeBody(final HornetQBuffer buffer)
{
lastReceivedCommandID = buffer.readInt();
- sessionFound = buffer.readBoolean();
+ reattached = buffer.readBoolean();
}
public boolean isResponse()
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -536,10 +536,13 @@
{
// Backup server is not ready to accept connections
- return new CreateSessionResponseMessage(false, version.getIncrementingVersion());
+ return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
- securityStore.authenticate(username, password);
+ if (securityStore != null)
+ {
+ securityStore.authenticate(username, password);
+ }
ServerSession currentSession = sessions.remove(name);
@@ -581,7 +584,7 @@
channel.setHandler(handler);
- return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
+ return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
public void removeSession(final String name) throws Exception
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -11,7 +11,6 @@
* permissions and limitations under the License.
*/
-
package org.hornetq.jms.client;
import java.lang.ref.WeakReference;
@@ -244,8 +243,10 @@
public synchronized void close() throws JMSException
{
+ log.info("Closing jms connection");
if (closed)
{
+ log.info("Already closed");
return;
}
@@ -279,6 +280,7 @@
{
if (initialSession != null)
{
+ log.info("closing initial session");
initialSession.close();
}
}
@@ -541,7 +543,7 @@
try
{
initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0);
-
+
initialSession.addFailureListener(listener);
}
catch (HornetQException me)
@@ -567,7 +569,7 @@
{
return;
}
-
+
HornetQConnection conn = connectionRef.get();
if (conn != null)
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -463,16 +463,6 @@
sessionFactory.setReconnectAttempts(reconnectAttempts);
}
- public synchronized boolean isUseReattach()
- {
- return sessionFactory.isUseReattach();
- }
-
- public synchronized void setUseReattach(boolean reattach)
- {
- sessionFactory.setUseReattach(reattach);
- }
-
public synchronized boolean isFailoverOnServerShutdown()
{
return sessionFactory.isFailoverOnServerShutdown();
Modified: trunk/src/main/org/hornetq/jms/client/JMSExceptionHelper.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/JMSExceptionHelper.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/client/JMSExceptionHelper.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -78,6 +78,10 @@
je = new javax.jms.IllegalStateException(me.getMessage());
break;
+ case HornetQException.TRANSACTION_ROLLED_BACK:
+ je = new javax.jms.TransactionRolledBackException(me.getMessage());
+ break;
+
default:
je = new JMSException(me.getMessage());
}
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -158,7 +158,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -191,7 +191,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -134,7 +134,7 @@
int getThreadPoolMaxSize();
void setThreadPoolMaxSize(int threadPoolMaxSize);
-
+
long getRetryInterval();
void setRetryInterval(long retryInterval);
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -89,7 +89,7 @@
public int scheduledThreadPoolMaxSize = ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
public int threadPoolMaxSize = ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
-
+
public long retryInterval = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
public double retryIntervalMultiplier = ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -253,7 +253,7 @@
discoveryInitialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -285,7 +285,7 @@
dupsOKBatchSize,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -382,7 +382,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -451,7 +451,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -486,7 +486,7 @@
cf.setDiscoveryInitialWaitTimeout(initialWaitTimeout);
cf.setUseGlobalPools(useGlobalPools);
cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
- cf.setThreadPoolMaxSize(threadPoolMaxSize);
+ cf.setThreadPoolMaxSize(threadPoolMaxSize);
cf.setRetryInterval(retryInterval);
cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
cf.setMaxRetryInterval(maxRetryInterval);
@@ -766,7 +766,7 @@
config.getInitialWaitTimeout(),
config.isUseGlobalPools(),
config.getScheduledThreadPoolMaxSize(),
- config.getThreadPoolMaxSize(),
+ config.getThreadPoolMaxSize(),
config.getRetryInterval(),
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
@@ -798,7 +798,7 @@
config.getDupsOKBatchSize(),
config.isUseGlobalPools(),
config.getScheduledThreadPoolMaxSize(),
- config.getThreadPoolMaxSize(),
+ config.getThreadPoolMaxSize(),
config.getRetryInterval(),
config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(),
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -115,7 +115,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -148,7 +148,7 @@
@Parameter(name = "dupsOKBatchSize") int dupsOKBatchSize,
@Parameter(name = "useGlobalPools") boolean useGlobalPools,
@Parameter(name = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
- @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
@Parameter(name = "retryInterval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier") double retryIntervalMultiplier,
@Parameter(name = "maxRetryInterval") long maxRetryInterval,
@@ -194,7 +194,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -228,7 +228,7 @@
@Parameter(name = "initialWaitTimeout") long initialWaitTimeout,
@Parameter(name = "useGlobalPools") boolean useGlobalPools,
@Parameter(name = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
- @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
@Parameter(name = "retryInterval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier") double retryIntervalMultiplier,
@Parameter(name = "maxRetryInterval") long maxRetryInterval,
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -242,7 +242,7 @@
final int dupsOKBatchSize,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -279,7 +279,7 @@
dupsOKBatchSize,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -315,7 +315,7 @@
final int dupsOKBatchSize,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -354,7 +354,7 @@
dupsOKBatchSize,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -412,7 +412,7 @@
final long initialWaitTimeout,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -447,7 +447,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -483,7 +483,7 @@
final long initialWaitTimeout,
final boolean useGlobalPools,
final int scheduledThreadPoolMaxSize,
- final int threadPoolMaxSize,
+ final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
@@ -518,7 +518,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -108,7 +108,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
DEFAULT_MAX_RETRY_INTERVAL,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -104,7 +104,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
DEFAULT_MAX_RETRY_INTERVAL,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -333,7 +333,7 @@
dupsOkBatchSize,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
DEFAULT_MAX_RETRY_INTERVAL,
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -286,6 +286,8 @@
consumer.setMessageHandler(null);
consumer.receiveImmediate();
+
+ session.close();
}
public void testNoReceiveWithListener() throws Exception
@@ -321,6 +323,8 @@
fail("Wrong exception code");
}
}
+
+ session.close();
}
// https://jira.jboss.org/jira/browse/HORNETQ-111
Modified: trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -404,6 +404,8 @@
assertEquals(1, recoveredXids.length);
assertEquals(xid, recoveredXids[0]);
assertEquals(0, session.recover(XAResource.TMENDRSCAN).length);
+
+ session.close();
}
finally
{
@@ -504,7 +506,8 @@
{
assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
}
-
+
+ session.close();
}
finally
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -75,6 +75,8 @@
Thread.sleep(250);
assertFalse(ackReceived);
+
+ session.close();
}
public static class AckInterceptor implements Interceptor
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -963,6 +963,7 @@
{
public void connectionFailed(HornetQException me)
{
+ log.info("calling listener");
latch.countDown();
}
}
@@ -1019,6 +1020,8 @@
// Wait to be informed of failure
boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ log.info("waited for latch");
assertTrue(ok);
@@ -1032,6 +1035,8 @@
{
assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
+
+ //Thread.sleep(30000);
session1.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -1323,8 +1323,7 @@
{
final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
sf.setReconnectAttempts(-1);
- sf.setUseReattach(true);
-
+
return sf;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -48,7 +48,6 @@
protected ClientSessionFactoryInternal createSessionFactory()
{
final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.integration.transports.netty.NettyConnectorFactory"));
- sf.setUseReattach(true);
sf.setReconnectAttempts(-1);
return sf;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -222,8 +222,6 @@
sf.setReconnectAttempts(-1);
- sf.setUseReattach(true);
-
ClientSession session = sf.createSession(false, false, false);
Failer failer = startFailer(1000, session);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -79,7 +79,6 @@
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
@@ -157,7 +156,6 @@
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
@@ -247,7 +245,6 @@
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
@@ -357,7 +354,6 @@
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
@@ -434,7 +430,6 @@
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
@@ -502,7 +497,6 @@
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
@@ -594,7 +588,6 @@
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
@@ -673,7 +666,6 @@
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
sf.setMaxRetryInterval(maxRetryInterval);
- sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -96,7 +96,7 @@
}
finally
{
- //closeAllConsumers();
+ closeAllConsumers();
closeAllSessionFactories();
@@ -172,7 +172,7 @@
}
finally
{
- //closeAllConsumers();
+ closeAllConsumers();
closeAllSessionFactories();
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -166,7 +166,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
1000,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -177,6 +177,13 @@
//Make sure it doesn't pass by just timing out on blocking send
assertTrue(end - start < callTimeout);
+
+ System.gc();
+ System.gc();
+ System.gc();
+ System.gc();
+
+ Thread.sleep(30000);
}
@@ -276,7 +283,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
1000,
@@ -287,19 +294,23 @@
protected void disconnect()
{
+ log.info("calling disocnnect");
if (connection == null)
{
+ log.info("connection is null");
return;
}
try
{
connection.setExceptionListener(null);
+ log.info("closing the connection");
connection.close();
connection = null;
}
catch (Exception e)
{
+ log.info("** got exception");
e.printStackTrace();
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -240,7 +240,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
DEFAULT_MAX_RETRY_INTERVAL,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -280,7 +280,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
DEFAULT_MAX_RETRY_INTERVAL,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -17,8 +17,8 @@
import javax.jms.Connection;
import javax.jms.Session;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.tests.util.JMSTestBase;
@@ -32,6 +32,8 @@
*/
public class CloseConnectionOnGCTest extends JMSTestBase
{
+ private static final Logger log = Logger.getLogger(CloseConnectionOnGCTest.class);
+
private HornetQConnectionFactory cf;
@Override
@@ -52,9 +54,24 @@
super.tearDown();
}
+ public void testFoo() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ log.info("Iteration " + i);
+
+ testCloseOneConnectionOnGC();
+ }
+ }
public void testCloseOneConnectionOnGC() throws Exception
{
+ //Debug - don't remove this until intermittent failure with this test is fixed
+ int initialConns = server.getRemotingService().getConnections().size();
+
+ assertEquals(0, initialConns);
+
+
Connection conn = cf.createConnection();
WeakReference<Connection> wr = new WeakReference<Connection>(conn);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -179,7 +179,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
DEFAULT_MAX_RETRY_INTERVAL,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -59,8 +59,9 @@
// Attributes ----------------------------------------------------
protected InVMContext context;
+
private HornetQServer server;
-
+
private JMSServerManagerImpl serverManager;
// Static --------------------------------------------------------
@@ -128,7 +129,7 @@
checkNoBinding(context, queueJNDIBinding);
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
}
-
+
public void testGetQueueNames() throws Exception
{
String queueJNDIBinding = randomString();
@@ -185,7 +186,7 @@
checkNoBinding(context, topicJNDIBinding);
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(topicName));
}
-
+
public void testGetTopicNames() throws Exception
{
String topicJNDIBinding = randomString();
@@ -320,13 +321,9 @@
0,
TransportConstants.SERVER_ID_PROP_NAME,
1);
-
- control.createConnectionFactory(cfName,
- InVMConnectorFactory.class.getName() + ", " + InVMConnectorFactory.class.getName(),
- params,
- "",
- "",
- jndiBindings);
+
+ control.createConnectionFactory(cfName, InVMConnectorFactory.class.getName() + ", " +
+ InVMConnectorFactory.class.getName(), params, "", "", jndiBindings);
}
});
}
@@ -352,7 +349,7 @@
});
}
- // with 1 live and 1 backup
+ // with 1 live and 1 backup
public void testCreateConnectionFactory_4b() throws Exception
{
doCreateConnectionFactory(new ConnectionFactoryCreator()
@@ -431,7 +428,7 @@
clientID,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
@@ -458,7 +455,7 @@
}
});
}
-
+
public void testCreateConnectionFactory_7b() throws Exception
{
doCreateConnectionFactory(new ConnectionFactoryCreator()
@@ -476,7 +473,7 @@
clientID,
ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
@@ -566,7 +563,7 @@
}
}
-
+
public void testGetConnectionFactoryNames() throws Exception
{
String cfBinding = randomString();
@@ -574,14 +571,17 @@
JMSServerControl control = createManagementControl();
assertEquals(0, control.getConnectionFactoryNames().length);
-
+
TransportConfiguration tcLive = new TransportConfiguration(InVMConnectorFactory.class.getName());
- control.createConnectionFactory(cfName, tcLive.getFactoryClassName(), tcLive.getParams(), new String[] {cfBinding});
+ control.createConnectionFactory(cfName,
+ tcLive.getFactoryClassName(),
+ tcLive.getParams(),
+ new String[] { cfBinding });
String[] cfNames = control.getConnectionFactoryNames();
assertEquals(1, cfNames.length);
assertEquals(cfName, cfNames[0]);
-
+
control.destroyConnectionFactory(cfName);
assertEquals(0, control.getConnectionFactoryNames().length);
}
@@ -612,11 +612,11 @@
protected void tearDown() throws Exception
{
serverManager.stop();
-
+
server.stop();
-
+
serverManager = null;
-
+
server = null;
super.tearDown();
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -117,7 +117,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -150,7 +150,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -184,7 +184,7 @@
long initialWaitTimeout,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -217,7 +217,7 @@
initialWaitTimeout,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
@@ -399,7 +399,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -432,7 +432,7 @@
transactionBatchSize,
dupsOKBatchSize,
useGlobalPools,
- scheduledThreadPoolMaxSize,
+ scheduledThreadPoolMaxSize,
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
@@ -468,7 +468,7 @@
int dupsOKBatchSize,
boolean useGlobalPools,
int scheduledThreadPoolMaxSize,
- int threadPoolMaxSize,
+ int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
long maxRetryInterval,
@@ -502,7 +502,7 @@
dupsOKBatchSize,
useGlobalPools,
scheduledThreadPoolMaxSize,
- threadPoolMaxSize,
+ threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -526,6 +526,30 @@
//
}
}
+
+ if (clientSessionTxReceives != null)
+ {
+ try
+ {
+ clientSessionTxReceives.close();
+ }
+ catch (HornetQException e1)
+ {
+ //
+ }
+ }
+
+ if (clientSessionTxSends != null)
+ {
+ try
+ {
+ clientSessionTxSends.close();
+ }
+ catch (HornetQException e1)
+ {
+ //
+ }
+ }
if (server != null && server.isStarted())
{
try
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-10-13 09:02:45 UTC (rev 8082)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-10-13 10:06:28 UTC (rev 8083)
@@ -219,7 +219,7 @@
DEFAULT_ACK_BATCH_SIZE,
DEFAULT_USE_GLOBAL_POOLS,
DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
DEFAULT_MAX_RETRY_INTERVAL,
14 years, 7 months
JBoss hornetq SVN: r8082 - in trunk: src/main/org/hornetq/core/remoting/impl and 5 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-13 05:02:45 -0400 (Tue, 13 Oct 2009)
New Revision: 8082
Added:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-50: ReceiveImmediate on consumer should check the server for messages
* added packet SessionForceConsumerDelivery. when it is sent one-way by the client, the server will prompt the queue
for delivery and sent back to the consumer a "force delivery" message
* in ClientConsumerImpl.receive(long, boolean), added a boolean forcingDelivery to force the delivery and wait for a "forced delivery"
message which is discarded when it is received (and receive() returns null)
* ClientConsumer.receiveImmediate() is the only receive*() method which forces message delivery
* JMS HornetQQueueBrowser uses ClientConsumer.receiveImmediate() to check if there are more messages to browse on the queue
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -15,6 +15,7 @@
import java.io.File;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientMessage;
@@ -52,6 +53,8 @@
public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
public static final int NUM_PRIORITIES = 10;
+
+ public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
// Attributes
// -----------------------------------------------------------------------------------
@@ -108,6 +111,8 @@
private boolean stopped = false;
+ private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -146,7 +151,7 @@
// ClientConsumer implementation
// -----------------------------------------------------------------
- public ClientMessage receive(long timeout) throws HornetQException
+ private ClientMessage receive(long timeout, boolean forcingDelivery) throws HornetQException
{
checkClosed();
@@ -181,6 +186,8 @@
timeout = Long.MAX_VALUE;
}
+ boolean deliveryForced = false;
+
long start = -1;
long toWait = timeout;
@@ -194,13 +201,22 @@
synchronized (this)
{
while ((stopped || (m = buffer.removeFirst()) == null) && !closed && toWait > 0)
-
{
if (start == -1)
{
start = System.currentTimeMillis();
}
+ if (m == null && forcingDelivery)
+ {
+ // we only force delivery once per call to receive
+ if (!deliveryForced)
+ {
+ session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
+ deliveryForced = true;
+ }
+ }
+
try
{
wait(toWait);
@@ -213,6 +229,11 @@
{
break;
}
+
+ if (forcingDelivery && stopped)
+ {
+ break;
+ }
long now = System.currentTimeMillis();
@@ -224,6 +245,20 @@
if (m != null)
{
+ if (m.containsProperty(FORCED_DELIVERY_MESSAGE))
+ {
+ Long seq = (Long)m.getProperty(FORCED_DELIVERY_MESSAGE);
+ if (seq >= forceDeliveryCount.longValue())
+ {
+ // forced delivery messages are discarded, nothing has been delivered by the queue
+ return null;
+ }
+ else
+ {
+ // ignore any previous forced delivery message
+ continue;
+ }
+ }
// if we have already pre acked we cant expire
boolean expired = m.isExpired();
@@ -269,14 +304,19 @@
}
}
+ public ClientMessage receive(long timeout) throws HornetQException
+ {
+ return receive(timeout, false);
+ }
+
public ClientMessage receive() throws HornetQException
{
- return receive(0);
+ return receive(0, false);
}
public ClientMessage receiveImmediate() throws HornetQException
{
- return receive(-1);
+ return receive(0, true);
}
public MessageHandler getMessageHandler() throws HornetQException
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -211,6 +211,17 @@
return true;
}
}
+
+ @Override
+ public String toString()
+ {
+ return "ClientMessage[messageID=" + messageID +
+ ", durable=" +
+ durable +
+ ", destination=" +
+ getDestination() +
+ "]";
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -49,6 +49,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -329,7 +330,16 @@
return response;
}
+
+ public void forceDelivery(long consumerID, long sequence) throws HornetQException
+ {
+ checkClosed();
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+
+ channel.send(request);
+ }
+
public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
{
return createConsumer(queueName, null, false);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -68,4 +68,6 @@
FailoverManager getConnectionManager();
void workDone();
+
+ void forceDelivery(long consumerID, long sequence) throws HornetQException;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -119,6 +119,11 @@
return session.bindingQuery(address);
}
+ public void forceDelivery(long consumerID, long sequence) throws HornetQException
+ {
+ session.forceDelivery(consumerID, sequence);
+ }
+
public void cleanUp() throws Exception
{
session.cleanUp();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -30,6 +30,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -77,6 +78,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -350,6 +352,11 @@
packet = new SessionSendContinuationMessage();
break;
}
+ case SESS_FORCE_CONSUMER_DELIVERY:
+ {
+ packet = new SessionForceConsumerDelivery();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -137,6 +137,8 @@
public static final byte SESS_RECEIVE_CONTINUATION = 76;
+ public static final byte SESS_FORCE_CONSUMER_DELIVERY = 77;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionForceConsumerDelivery.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ *
+ * A SessionConsumerForceDelivery
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class SessionForceConsumerDelivery extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+ private long sequence;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionForceConsumerDelivery(final long consumerID, final long sequence)
+ {
+ super(SESS_FORCE_CONSUMER_DELIVERY);
+
+ this.consumerID = consumerID;
+ this.sequence = sequence;
+ }
+
+ public SessionForceConsumerDelivery()
+ {
+ super(SESS_FORCE_CONSUMER_DELIVERY);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getSequence()
+ {
+ return sequence;
+ }
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
+ }
+
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+ buffer.writeLong(sequence);
+ }
+
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ consumerID = buffer.readLong();
+ sequence = buffer.readLong();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", consumerID=" + consumerID);
+ buf.append(", sequence=" + sequence);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof SessionForceConsumerDelivery == false)
+ {
+ return false;
+ }
+
+ SessionForceConsumerDelivery r = (SessionForceConsumerDelivery)other;
+
+ return super.equals(other) && this.consumerID == r.consumerID && this.sequence == r.sequence;
+ }
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -30,8 +30,6 @@
void close() throws Exception;
- int getCountOfPendingDeliveries();
-
List<MessageReference> cancelRefs(boolean lastConsumedAsDelivered, Transaction tx) throws Exception;
void setStarted(boolean started);
@@ -43,4 +41,6 @@
MessageReference getExpired(long messageID) throws Exception;
void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
+
+ void forceDelivery(long sequence);
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -21,6 +21,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -66,7 +67,7 @@
void close() throws Exception;
- void promptDelivery(Queue queue);
+ void promptDelivery(Queue queue, boolean async);
void handleAcknowledge(final SessionAcknowledgeMessage packet);
@@ -124,6 +125,8 @@
void handleSendLargeMessage(SessionSendLargeMessage packet);
+ void handleForceConsumerDelivery(SessionForceConsumerDelivery message);
+
void handleClose(Packet packet);
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -280,10 +280,9 @@
}
}
- // Only used in testing - do not call directly!
public synchronized void deliverNow()
{
- deliver();
+ deliverRunner.run();
}
public synchronized void addConsumer(final Consumer consumer) throws Exception
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -22,6 +22,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
@@ -46,7 +47,6 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
/**
@@ -255,9 +255,30 @@
}
}
- public int getCountOfPendingDeliveries()
- {
- return deliveringRefs.size();
+ /**
+ * Prompt delivery and send a "forced delivery" message to the consumer.
+ * When the consumer receives such a "forced delivery" message, it discards it
+ * and knows that there are no other messages to be delivered.
+ */
+ public synchronized void forceDelivery(final long sequence)
+ {
+ // The prompt delivery is called synchronously to ensure the "forced delivery" message is
+ // sent after any queue delivery.
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ promptDelivery(false);
+
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+ forcedDeliveryMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setDestination(messageQueue.getName());
+
+ final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+ channel.send(packet);
+ }
+ });
}
public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
@@ -305,7 +326,7 @@
// Outside the lock
if (started)
{
- promptDelivery();
+ promptDelivery(true);
}
}
@@ -331,7 +352,7 @@
if (previous <= 0 && previous + credits > 0)
{
- promptDelivery();
+ promptDelivery(true);
}
}
}
@@ -420,7 +441,7 @@
// Private --------------------------------------------------------------------------------------
- private void promptDelivery()
+ private void promptDelivery(boolean asyncDelivery)
{
lock.lock();
try
@@ -435,11 +456,18 @@
{
if (browseOnly)
{
- executor.execute(browserDeliverer);
+ if (asyncDelivery)
+ {
+ executor.execute(browserDeliverer);
+ }
+ else
+ {
+ browserDeliverer.run();
+ }
}
else
{
- session.promptDelivery(messageQueue);
+ session.promptDelivery(messageQueue, asyncDelivery);
}
}
}
@@ -590,7 +618,7 @@
else
{
// prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
+ session.promptDelivery(messageQueue, true);
}
}
}
@@ -865,7 +893,7 @@
}
catch (Exception e)
{
- log.warn("Exception while browser handled from " + messageQueue + ": " + current);
+ log.warn("Exception while browser handled from " + messageQueue + ": " + current, e);
return;
}
}
@@ -886,7 +914,7 @@
}
catch (Exception e)
{
- log.warn("Exception while browser handled from " + messageQueue + ": " + ref);
+ log.warn("Exception while browser handled from " + messageQueue + ": " + ref, e);
break;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -56,6 +56,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -320,9 +321,16 @@
remotingConnection.removeFailureListener(this);
}
- public void promptDelivery(final Queue queue)
+ public void promptDelivery(final Queue queue, boolean async)
{
- queue.deliverAsync(executor);
+ if (async)
+ {
+ queue.deliverAsync(executor);
+ }
+ else
+ {
+ queue.deliverNow();
+ }
}
public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
@@ -626,7 +634,21 @@
channel.send(response);
}
+
+ public void handleForceConsumerDelivery(SessionForceConsumerDelivery message)
+ {
+ try
+ {
+ ServerConsumer consumer = consumers.get(message.getConsumerID());
+ consumer.forceDelivery(message.getSequence());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to query consumer deliveries", e);
+ }
+ }
+
public void handleAcknowledge(final SessionAcknowledgeMessage packet)
{
Packet response = null;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -20,6 +20,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -51,6 +52,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
@@ -267,6 +269,12 @@
session.handleSendContinuations(message);
break;
}
+ case SESS_FORCE_CONSUMER_DELIVERY:
+ {
+ SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
+ session.handleForceConsumerDelivery(message);
+ break;
+ }
}
}
catch (Throwable t)
Modified: trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java 2009-10-12 18:37:50 UTC (rev 8081)
+++ trunk/src/main/org/hornetq/jms/client/HornetQQueueBrowser.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -32,8 +32,8 @@
/**
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * <p/>
- * $Id$
+ *
+ * $Id$
*/
public class HornetQQueueBrowser implements QueueBrowser
{
@@ -41,8 +41,6 @@
private static final Logger log = Logger.getLogger(HornetQQueueBrowser.class);
- private static final long NEXT_MESSAGE_TIMEOUT = 1000;
-
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
@@ -136,9 +134,7 @@
{
try
{
- // todo change this to consumer.receiveImmediate() once
- // https://jira.jboss.org/jira/browse/JBMESSAGING-1432 is completed
- current = consumer.receive(NEXT_MESSAGE_TIMEOUT);
+ current = consumer.receiveImmediate();
}
catch (HornetQException e)
{
Added: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java 2009-10-13 09:02:45 UTC (rev 8082)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class ReceiveImmediateTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ReceiveImmediateTest.class);
+
+ private HornetQServer server;
+
+ private final SimpleString QUEUE = new SimpleString("ReceiveImmediateTest.queue");
+
+ private final SimpleString ADDRESS = new SimpleString("ReceiveImmediateTest.address");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = createDefaultConfig(false);
+ server = HornetQ.newHornetQServer(config, false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ private ClientSessionFactory sf;
+
+ public void testConsumerReceiveImmediateWithNoMessages() throws Exception
+ {
+ doConsumerReceiveImmediateWithNoMessages(false);
+ }
+
+ public void testConsumerReceiveImmediate() throws Exception
+ {
+ doConsumerReceiveImmediate(false);
+ }
+
+ public void testBrowserReceiveImmediateWithNoMessages() throws Exception
+ {
+ doConsumerReceiveImmediateWithNoMessages(true);
+ }
+
+ public void testBrowserReceiveImmediate() throws Exception
+ {
+ doConsumerReceiveImmediate(true);
+ }
+
+ private void doConsumerReceiveImmediateWithNoMessages(boolean browser) throws Exception
+ {
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setAckBatchSize(0);
+
+ ClientSession session = sf.createSession(false, true, false);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, browser);
+ session.start();
+
+ ClientMessage message = consumer.receiveImmediate();
+ assertNull(message);
+
+ session.close();
+
+ sf.close();
+ }
+
+ private void doConsumerReceiveImmediate(boolean browser) throws Exception
+ {
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setAckBatchSize(0);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, browser);
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receiveImmediate();
+ assertNotNull("did not receive message " + i, message2);
+ assertEquals("m" + i, message2.getBody().readString());
+ if (!browser)
+ {
+ message2.acknowledge();
+ }
+ }
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+
+ assertNull(consumer.receiveImmediate());
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ int messagesOnServer = (browser ? numMessages : 0);
+ assertEquals(messagesOnServer, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+
+ consumer.close();
+
+ session.close();
+
+ sf.close();
+
+ }
+
+}
14 years, 7 months
JBoss hornetq SVN: r8081 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-12 14:37:50 -0400 (Mon, 12 Oct 2009)
New Revision: 8081
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
Log:
Changing defaults on sync for the test
Modified: trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-10-12 15:28:06 UTC (rev 8080)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-10-12 18:37:50 UTC (rev 8081)
@@ -115,6 +115,7 @@
for (int i = 0; i < 200; i++)
{
System.out.println("Iteration " + i);
+ // Sending non transactionally, so it would test non transactional stuff on the journal
for (int j = 0; j < 1000; j++)
{
Message msg = session.createClientMessage(true);
@@ -123,6 +124,7 @@
prod.send(msg);
}
+ // I need to guarantee a roundtrip to the server, to make sure everything is persisted
session.commit();
for (int j = 0; j < 1000; j++)
@@ -132,6 +134,7 @@
msg.acknowledge();
}
+ // I need to guarantee a roundtrip to the server, to make sure everything is persisted
session.commit();
}
@@ -419,6 +422,7 @@
private void setupServer(JournalType journalType) throws Exception, HornetQException
{
Configuration config = createDefaultConfig();
+ config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
config.setJournalType(journalType);
@@ -431,6 +435,8 @@
server.start();
sf = createInVMFactory();
+ sf.setBlockOnPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
ClientSession sess = sf.createSession();
14 years, 7 months
JBoss hornetq SVN: r8080 - in trunk: src/main/org/hornetq/core/management/impl and 3 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-12 11:28:06 -0400 (Mon, 12 Oct 2009)
New Revision: 8080
Modified:
trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-163: clean up the FilterImpl creation
* null & empty strings returns a null filter, otherwise create a new FilterImpl
* flagged FilterImpl ctor as private and use FilterImpl.createFilter() methods instead
* Removed redundant QueueControlImpl.createFilter
-- patch provided by Sergej Zizemski
Modified: trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
+++ trunk/src/main/org/hornetq/core/filter/impl/FilterImpl.java 2009-10-12 15:28:06 UTC (rev 8080)
@@ -88,28 +88,33 @@
// Static ---------------------------------------------------------
/**
- * @return null if <code>filterStr</code> is null or a valid filter else
+ * @return null if <code>filterStr</code> is null or an empty String and a valid filter else
* @throws HornetQException if the string does not correspond to a valid filter
*/
public static Filter createFilter(final String filterStr) throws HornetQException
{
- Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
- return filter;
+ return createFilter(SimpleString.toSimpleString(filterStr));
}
/**
- * @return null if <code>filterStr</code> is null or a valid filter else
+ * @return null if <code>filterStr</code> is null or an empty String and a valid filter else
* @throws HornetQException if the string does not correspond to a valid filter
*/
public static Filter createFilter(final SimpleString filterStr) throws HornetQException
{
- Filter filter = filterStr == null ? null : new FilterImpl(filterStr);
- return filter;
+ if (filterStr == null || filterStr.length() == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return new FilterImpl(filterStr);
+ }
}
// Constructors ---------------------------------------------------
- public FilterImpl(final SimpleString str) throws HornetQException
+ private FilterImpl(final SimpleString str) throws HornetQException
{
sfilterString = str;
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-10-12 15:28:06 UTC (rev 8080)
@@ -74,21 +74,6 @@
return array.toString();
}
- /**
- * Returns null if the string is null or empty
- */
- public static Filter createFilter(final String filterStr) throws HornetQException
- {
- if (filterStr == null || filterStr.trim().length() == 0)
- {
- return null;
- }
- else
- {
- return new FilterImpl(new SimpleString(filterStr));
- }
- }
-
// Constructors --------------------------------------------------
public QueueControlImpl(final Queue queue,
@@ -243,7 +228,7 @@
{
try
{
- Filter filter = createFilter(filterStr);
+ Filter filter = FilterImpl.createFilter(filterStr);
List<MessageReference> refs = queue.list(filter);
Map<String, Object>[] messages = new Map[refs.size()];
int i = 0;
@@ -267,7 +252,7 @@
public int countMessages(final String filterStr) throws Exception
{
- Filter filter = createFilter(filterStr);
+ Filter filter = FilterImpl.createFilter(filterStr);
List<MessageReference> refs = queue.list(filter);
return refs.size();
}
@@ -286,7 +271,7 @@
public int removeMessages(final String filterStr) throws Exception
{
- Filter filter = createFilter(filterStr);
+ Filter filter = FilterImpl.createFilter(filterStr);
return queue.deleteMatchingReferences(filter);
}
@@ -299,7 +284,7 @@
{
try
{
- Filter filter = createFilter(filterStr);
+ Filter filter = FilterImpl.createFilter(filterStr);
return queue.expireReferences(filter);
}
catch (HornetQException e)
@@ -322,7 +307,7 @@
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
{
- Filter filter = createFilter(filterStr);
+ Filter filter = FilterImpl.createFilter(filterStr);
Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
@@ -336,7 +321,7 @@
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception
{
- Filter filter = createFilter(filterStr);
+ Filter filter = FilterImpl.createFilter(filterStr);
List<MessageReference> refs = queue.list(filter);
@@ -355,7 +340,7 @@
public int changeMessagesPriority(String filterStr, int newPriority) throws Exception
{
- Filter filter = createFilter(filterStr);
+ Filter filter = FilterImpl.createFilter(filterStr);
List<MessageReference> refs = queue.list(filter);
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-12 15:28:06 UTC (rev 8080)
@@ -90,14 +90,7 @@
this.remoteQueueID = remoteQueueID;
- if (filterString != null)
- {
- queueFilter = new FilterImpl(filterString);
- }
- else
- {
- queueFilter = null;
- }
+ queueFilter = FilterImpl.createFilter(filterString);
this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
@@ -228,7 +221,7 @@
{
filterCounts.put(filterString, 0);
- filters.add(new FilterImpl(filterString));
+ filters.add(FilterImpl.createFilter(filterString));
}
else
{
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-12 15:28:06 UTC (rev 8080)
@@ -1128,13 +1128,8 @@
for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
{
- Filter filter = null;
+ Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
- if (queueBindingInfo.getFilterString() != null)
- {
- filter = new FilterImpl(queueBindingInfo.getFilterString());
- }
-
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
@@ -1226,13 +1221,8 @@
}
}
- Filter filter = null;
+ Filter filter = FilterImpl.createFilter(filterString);
- if (filterString != null)
- {
- filter = new FilterImpl(filterString);
- }
-
final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(),
address,
queueName,
@@ -1293,13 +1283,8 @@
Transformer transformer = instantiateTransformer(config.getTransformerClassName());
- Filter filter = null;
+ Filter filter = FilterImpl.createFilter(config.getFilterString());
- if (config.getFilterString() != null)
- {
- filter = new FilterImpl(new SimpleString(config.getFilterString()));
- }
-
Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
sName,
new SimpleString(config.getRoutingName()),
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-12 15:28:06 UTC (rev 8080)
@@ -346,13 +346,8 @@
securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
- Filter filter = null;
+ Filter filter = FilterImpl.createFilter(filterString);;
- if (filterString != null)
- {
- filter = new FilterImpl(filterString);
- }
-
ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
this,
(QueueBinding)binding,
Modified: trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-10-12 13:51:48 UTC (rev 8079)
+++ trunk/tests/src/org/hornetq/tests/unit/core/filter/impl/FilterTest.java 2009-10-12 15:28:06 UTC (rev 8080)
@@ -45,7 +45,7 @@
public void testFilterForgets() throws Exception
{
- filter = new FilterImpl(new SimpleString("color = 'RED'"));
+ filter = FilterImpl.createFilter(new SimpleString("color = 'RED'"));
message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
assertTrue(filter.match(message));
@@ -57,17 +57,26 @@
public void testInvalidString() throws Exception
{
testInvalidFilter("invalid");
+ testInvalidFilter(new SimpleString("invalid"));
testInvalidFilter("color = 'red");
+ testInvalidFilter(new SimpleString("color = 'red"));
testInvalidFilter("3");
-
- testInvalidFilter(null);
+ testInvalidFilter(new SimpleString("3"));
}
+ public void testNullFilter() throws Exception
+ {
+ assertNull(FilterImpl.createFilter((String)null));
+ assertNull(FilterImpl.createFilter(""));
+ assertNull(FilterImpl.createFilter((SimpleString)null));
+ assertNull(FilterImpl.createFilter(new SimpleString("")));
+ }
+
public void testHQDurable() throws Exception
{
- filter = new FilterImpl(new SimpleString("HQDurable='DURABLE'"));
+ filter = FilterImpl.createFilter(new SimpleString("HQDurable='DURABLE'"));
message.setDurable(true);
@@ -77,7 +86,7 @@
assertFalse(filter.match(message));
- filter = new FilterImpl(new SimpleString("HQDurable='NON_DURABLE'"));
+ filter = FilterImpl.createFilter(new SimpleString("HQDurable='NON_DURABLE'"));
message = new ServerMessageImpl();
message.setDurable(true);
@@ -96,8 +105,8 @@
message.setBody(ChannelBuffers.wrappedBuffer(RandomUtil.randomBytes(1)));
assertTrue(message.getEncodeSize() < 1024);
- Filter moreThan128 = new FilterImpl(new SimpleString("HQSize > 128"));
- Filter lessThan1024 = new FilterImpl(new SimpleString("HQSize < 1024"));
+ Filter moreThan128 = FilterImpl.createFilter(new SimpleString("HQSize > 128"));
+ Filter lessThan1024 = FilterImpl.createFilter(new SimpleString("HQSize < 1024"));
assertFalse(moreThan128.match(message));
assertTrue(lessThan1024.match(message));
@@ -111,7 +120,7 @@
public void testHQPriority() throws Exception
{
- filter = new FilterImpl(new SimpleString("HQPriority=3"));
+ filter = FilterImpl.createFilter(new SimpleString("HQPriority=3"));
for (int i = 0; i < 10; i++)
{
@@ -130,7 +139,7 @@
public void testHQTimestamp() throws Exception
{
- filter = new FilterImpl(new SimpleString("HQTimestamp=12345678"));
+ filter = FilterImpl.createFilter(new SimpleString("HQTimestamp=12345678"));
message.setTimestamp(87654321);
@@ -143,31 +152,31 @@
public void testBooleanTrue() throws Exception
{
- filter = new FilterImpl(new SimpleString("MyBoolean=true"));
+ filter = FilterImpl.createFilter(new SimpleString("MyBoolean=true"));
testBoolean("MyBoolean", true);
}
public void testDifferentNullString() throws Exception
{
- filter = new FilterImpl(new SimpleString("prop <> 'foo'"));
+ filter = FilterImpl.createFilter(new SimpleString("prop <> 'foo'"));
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("NOT (prop = 'foo')"));
+ filter = FilterImpl.createFilter(new SimpleString("NOT (prop = 'foo')"));
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("prop <> 'foo'"));
+ filter = FilterImpl.createFilter(new SimpleString("prop <> 'foo'"));
doPutStringProperty("prop", "bar");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("prop <> 'foo'"));
+ filter = FilterImpl.createFilter(new SimpleString("prop <> 'foo'"));
doPutStringProperty("prop", "foo");
assertFalse(filter.match(message));
}
public void testBooleanFalse() throws Exception
{
- filter = new FilterImpl(new SimpleString("MyBoolean=false"));
+ filter = FilterImpl.createFilter(new SimpleString("MyBoolean=false"));
testBoolean("MyBoolean", false);
}
@@ -183,7 +192,7 @@
public void testStringEquals() throws Exception
{
// First, simple test of string equality and inequality
- filter = new FilterImpl(new SimpleString("MyString='astring'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString='astring'"));
doPutStringProperty("MyString", "astring");
assertTrue(filter.match(message));
@@ -192,7 +201,7 @@
assertTrue(!filter.match(message));
// test empty string
- filter = new FilterImpl(new SimpleString("MyString=''"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString=''"));
doPutStringProperty("MyString", "");
assertTrue("test 1", filter.match(message));
@@ -202,7 +211,7 @@
// test literal apostrophes (which are escaped using two apostrophes
// in selectors)
- filter = new FilterImpl(new SimpleString("MyString='test JBoss''s filter'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString='test JBoss''s filter'"));
// note: apostrophes are not escaped in string properties
doPutStringProperty("MyString", "test JBoss's filter");
@@ -217,7 +226,7 @@
public void testStringLike() throws Exception
{
// test LIKE operator with no wildcards
- filter = new FilterImpl(new SimpleString("MyString LIKE 'astring'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'astring'"));
assertFalse(filter.match(message));
// test where LIKE operand matches
@@ -225,17 +234,17 @@
assertTrue(filter.match(message));
// test one character string
- filter = new FilterImpl(new SimpleString("MyString LIKE 'a'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'a'"));
doPutStringProperty("MyString","a");
assertTrue(filter.match(message));
// test empty string
- filter = new FilterImpl(new SimpleString("MyString LIKE ''"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE ''"));
doPutStringProperty("MyString", "");
assertTrue(filter.match(message));
// tests where operand does not match
- filter = new FilterImpl(new SimpleString("MyString LIKE 'astring'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'astring'"));
// test with extra characters at beginning
doPutStringProperty("MyString", "NOTastring");
@@ -271,7 +280,7 @@
// matches any single character
// first, some tests with the wildcard by itself
- filter = new FilterImpl(new SimpleString("MyString LIKE '_'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '_'"));
assertFalse(filter.match(message));
// test match against single character
@@ -288,7 +297,7 @@
// next, tests with wildcard at the beginning of the string
- filter = new FilterImpl(new SimpleString("MyString LIKE '_bcdf'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '_bcdf'"));
// test match at beginning of string
doPutStringProperty("MyString", "abcdf");
@@ -323,7 +332,7 @@
assertTrue(!filter.match(message));
// next, tests with wildcard at the end of the string
- filter = new FilterImpl(new SimpleString("MyString LIKE 'abcd_'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'abcd_'"));
// test match at end of string
doPutStringProperty("MyString", "abcdf");
@@ -360,7 +369,7 @@
// test match in middle of string
// next, tests with wildcard in the middle of the string
- filter = new FilterImpl(new SimpleString("MyString LIKE 'ab_df'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'ab_df'"));
// test match in the middle of string
doPutStringProperty("MyString", "abcdf");
@@ -400,7 +409,7 @@
public void testNotLikeExpression() throws Exception
{
//Should evaluate to true since the property MyString does not exist
- filter = new FilterImpl(new SimpleString("NOT (MyString LIKE '%')"));
+ filter = FilterImpl.createFilter(new SimpleString("NOT (MyString LIKE '%')"));
assertTrue(filter.match(message));
}
@@ -413,7 +422,7 @@
// first, some tests with the wildcard by itself
- filter = new FilterImpl(new SimpleString("MyString LIKE '%'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '%'"));
assertFalse(filter.match(message));
// test match against single character
@@ -433,7 +442,7 @@
// next, tests with wildcard at the beginning of the string
- filter = new FilterImpl(new SimpleString("MyString LIKE '%bcdf'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '%bcdf'"));
// test match with single character at beginning of string
doPutStringProperty("MyString", "Xbcdf");
@@ -464,7 +473,7 @@
assertTrue(filter.match(message));
// next, tests with wildcard at the end of the string
- filter = new FilterImpl(new SimpleString("MyString LIKE 'abcd%'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'abcd%'"));
// test match of single character at end of string
doPutStringProperty("MyString", "abcdf");
@@ -499,7 +508,7 @@
assertTrue(filter.match(message));
// next, tests with wildcard in the middle of the string
- filter = new FilterImpl(new SimpleString("MyString LIKE 'ab%df'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'ab%df'"));
// test match with single character in the middle of string
doPutStringProperty("MyString", "abXdf");
@@ -551,7 +560,7 @@
// wildcards of the current underlying RE engine,
// GNU regexp.
- filter = new FilterImpl(new SimpleString("MyString LIKE 'a^$b'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'a^$b'"));
assertFalse(filter.match(message));
doPutStringProperty("MyString", "a^$b");
@@ -559,67 +568,67 @@
// this one has a double backslash since backslash
// is interpreted specially by Java
- filter = new FilterImpl(new SimpleString("MyString LIKE 'a\\dc'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'a\\dc'"));
doPutStringProperty("MyString", "a\\dc");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE 'a.c'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'a.c'"));
doPutStringProperty("MyString", "abc");
assertTrue(!filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '[abc]'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '[abc]'"));
doPutStringProperty("MyString", "[abc]");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '[^abc]'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '[^abc]'"));
doPutStringProperty("MyString", "[^abc]");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '[a-c]'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '[a-c]'"));
doPutStringProperty("MyString", "[a-c]");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '[:alpha]'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '[:alpha]'"));
doPutStringProperty("MyString", "[:alpha]");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(abc)'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(abc)'"));
doPutStringProperty("MyString", "(abc)");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE 'a|bc'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE 'a|bc'"));
doPutStringProperty("MyString", "a|bc");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(abc)?'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(abc)?'"));
doPutStringProperty("MyString", "(abc)?");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(abc)*'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(abc)*'"));
doPutStringProperty("MyString", "(abc)*");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(abc)+'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(abc)+'"));
doPutStringProperty("MyString", "(abc)+");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(abc){3}'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(abc){3}'"));
doPutStringProperty("MyString", "(abc){3}");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(abc){3,5}'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(abc){3,5}'"));
doPutStringProperty("MyString", "(abc){3,5}");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(abc){3,}'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(abc){3,}'"));
doPutStringProperty("MyString", "(abc){3,}");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(?=abc)'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(?=abc)'"));
doPutStringProperty("MyString", "(?=abc)");
assertTrue(filter.match(message));
- filter = new FilterImpl(new SimpleString("MyString LIKE '(?!abc)'"));
+ filter = FilterImpl.createFilter(new SimpleString("MyString LIKE '(?!abc)'"));
doPutStringProperty("MyString", "(?!abc)");
assertTrue(filter.match(message));
}
@@ -636,15 +645,7 @@
{
try
{
- if (filterString != null)
- {
- filter = new FilterImpl(new SimpleString(filterString));
- }
- else
- {
- filter = new FilterImpl(null);
- }
-
+ filter = FilterImpl.createFilter(filterString);
fail("Should throw exception");
}
catch (HornetQException e)
@@ -653,4 +654,17 @@
}
}
+ private void testInvalidFilter(SimpleString filterString) throws Exception
+ {
+ try
+ {
+ filter = FilterImpl.createFilter(filterString);
+ fail("Should throw exception");
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.INVALID_FILTER_EXPRESSION, e.getCode());
+ }
+ }
+
}
14 years, 7 months
JBoss hornetq SVN: r8079 - in trunk: src/main/org/hornetq/core/management/impl and 2 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-12 09:51:48 -0400 (Mon, 12 Oct 2009)
New Revision: 8079
Modified:
trunk/src/main/org/hornetq/core/management/AddressControl.java
trunk/src/main/org/hornetq/core/management/ManagementService.java
trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-174: add page store information to the AddressControl management API
* added getNumberOfBytesPerPage() & getNumberOfPages() methods to AddressControl
-- patch contributed by Bijith Kumar
Modified: trunk/src/main/org/hornetq/core/management/AddressControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/AddressControl.java 2009-10-10 23:35:44 UTC (rev 8078)
+++ trunk/src/main/org/hornetq/core/management/AddressControl.java 2009-10-12 13:51:48 UTC (rev 8079)
@@ -31,6 +31,10 @@
String getRolesAsJSON() throws Exception;
String[] getQueueNames() throws Exception;
+
+ int getNumberOfPages() throws Exception;
+
+ long getNumberOfBytesPerPage() throws Exception;
// Operations ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ManagementService.java 2009-10-10 23:35:44 UTC (rev 8078)
+++ trunk/src/main/org/hornetq/core/management/ManagementService.java 2009-10-12 13:51:48 UTC (rev 8079)
@@ -28,6 +28,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.messagecounter.MessageCounterManager;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.server.RemotingService;
@@ -81,6 +82,7 @@
HornetQServer messagingServer,
QueueFactory queueFactory,
ScheduledExecutorService scheduledThreadPool,
+ final PagingManager pagingManager,
boolean backup) throws Exception;
void unregisterServer() throws Exception;
Modified: trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
+++ trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
@@ -20,6 +20,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AddressControl;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -48,6 +49,8 @@
private final SimpleString address;
private final PostOffice postOffice;
+
+ private final PagingManager pagingManager;
private final HierarchicalRepository<Set<Role>> securityRepository;
@@ -57,12 +60,14 @@
public AddressControlImpl(final SimpleString address,
final PostOffice postOffice,
+ final PagingManager pagingManager,
final HierarchicalRepository<Set<Role>> securityRepository)
throws Exception
{
super(AddressControl.class);
this.address = address;
this.postOffice = postOffice;
+ this.pagingManager = pagingManager;
this.securityRepository = securityRepository;
}
@@ -126,7 +131,17 @@
}
return json.toString();
}
+
+ public long getNumberOfBytesPerPage() throws Exception
+ {
+ return pagingManager.getPageStore(address).getPageSizeBytes();
+ }
+ public int getNumberOfPages() throws Exception
+ {
+ return pagingManager.getPageStore(address).getNumberOfPages();
+ }
+
public synchronized void addRole(final String name,
final boolean send,
final boolean consume,
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
@@ -54,6 +54,7 @@
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.MessageCounterManager;
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.server.RemotingService;
@@ -97,6 +98,8 @@
private PostOffice postOffice;
+ private PagingManager pagingManager;
+
private StorageManager storageManager;
private HornetQServer messagingServer;
@@ -187,6 +190,7 @@
final HornetQServer messagingServer,
final QueueFactory queueFactory,
final ScheduledExecutorService scheduledThreadPool,
+ final PagingManager pagingManager,
final boolean backup) throws Exception
{
this.postOffice = postOffice;
@@ -194,6 +198,7 @@
this.securityRepository = securityRepository;
this.storageManager = storageManager;
this.messagingServer = messagingServer;
+ this.pagingManager = pagingManager;
this.messageCounterManager = new MessageCounterManagerImpl(scheduledThreadPool);
messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
@@ -223,7 +228,7 @@
public synchronized void registerAddress(final SimpleString address) throws Exception
{
ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
- AddressControlImpl addressControl = new AddressControlImpl(address, postOffice, securityRepository);
+ AddressControlImpl addressControl = new AddressControlImpl(address, postOffice, pagingManager, securityRepository);
registerInJMX(objectName, addressControl);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-12 13:51:48 UTC (rev 8079)
@@ -999,6 +999,7 @@
this,
queueFactory,
scheduledPool,
+ pagingManager,
configuration.isBackup());
// Address settings need to deployed initially, since they're require on paging manager.start()
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2009-10-10 23:35:44 UTC (rev 8078)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2009-10-12 13:51:48 UTC (rev 8079)
@@ -20,6 +20,8 @@
import java.util.HashSet;
import java.util.Set;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
@@ -34,6 +36,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.utils.SimpleString;
/**
@@ -330,7 +333,84 @@
session.deleteQueue(queue);
}
+
+ public void testGetNumberOfPages() throws Exception
+ {
+ session.close();
+ server.stop();
+
+ SimpleString address = randomSimpleString();
+
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setPageSizeBytes(1024);
+ addressSettings.setMaxSizeBytes(10 * 1024);
+ int NUMBER_MESSAGES_BEFORE_PAGING = 14;
+
+ server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
+ server.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ session = sf.createSession(false, true, false);
+ session.start();
+ session.createQueue(address, address, true);
+
+ ClientProducer producer = session.createProducer(address);
+
+ for (int i = 0; i < NUMBER_MESSAGES_BEFORE_PAGING; i++)
+ {
+ ClientMessage msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[512]);
+ producer.send(msg);
+ }
+ session.commit();
+
+ AddressControl addressControl = createManagementControl(address);
+ assertEquals(0, addressControl.getNumberOfPages());
+
+ ClientMessage msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[512]);
+ producer.send(msg);
+ session.commit();
+ assertEquals(1, addressControl.getNumberOfPages());
+
+ msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[512]);
+ producer.send(msg);
+
+ session.commit();
+ assertEquals(1, addressControl.getNumberOfPages());
+
+ msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[512]);
+ producer.send(msg);
+
+ session.commit();
+ assertEquals(2, addressControl.getNumberOfPages());
+ }
+
+ public void testGetNumberOfBytesPerPage() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ session.createQueue(address, address, true);
+
+ AddressControl addressControl = createManagementControl(address);
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE, addressControl.getNumberOfBytesPerPage());
+
+ session.close();
+ server.stop();
+
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setPageSizeBytes(1024);
+
+ server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
+ server.start();
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ session = sf.createSession(false, true, false);
+ session.createQueue(address, address, true);
+ assertEquals(1024, addressControl.getNumberOfBytesPerPage());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
14 years, 7 months
JBoss hornetq SVN: r8078 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 7 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-10 19:35:44 -0400 (Sat, 10 Oct 2009)
New Revision: 8078
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Replication on large message
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -99,6 +99,8 @@
LargeServerMessage createLargeMessage();
+ LargeServerMessage createLargeMessage(byte [] header);
+
void prepare(long txID, Xid xid) throws Exception;
void commit(long txID) throws Exception;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -17,6 +17,8 @@
import java.nio.ByteBuffer;
+import com.sun.org.apache.bcel.internal.generic.StoreInstruction;
+
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -91,11 +93,9 @@
{
file.open();
}
+
+ storageManager.addBytesToLargeMessage(file, this.getMessageID(), bytes);
- file.position(file.size());
-
- file.write(ByteBuffer.wrap(bytes), false);
-
bodySize += bytes.length;
}
@@ -232,6 +232,7 @@
public synchronized void deleteFile() throws Exception
{
validateFile();
+ releaseResources();
storageManager.deleteFile(file);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -18,6 +18,7 @@
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import java.io.File;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -376,7 +377,35 @@
{
return new JournalLargeServerMessage(this);
}
+
+ public void addBytesToLargeMessage(SequentialFile file, long messageId, final byte[] bytes) throws Exception
+ {
+ file.position(file.size());
+ file.write(ByteBuffer.wrap(bytes), false);
+
+ if (isReplicated())
+ {
+ this.replicator.largeMessageWrite(messageId, bytes);
+ }
+ }
+
+ public LargeServerMessage createLargeMessage(byte [] header)
+ {
+ if (isReplicated())
+ {
+ replicator.largeMessageBegin(header);
+ }
+
+ JournalLargeServerMessage largeMessage = new JournalLargeServerMessage(this);
+
+ HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+
+ largeMessage.decodeProperties(headerBuffer);
+
+ return largeMessage;
+ }
+
// Non transactional operations
public void storeMessage(final ServerMessage message) throws Exception
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -19,6 +19,7 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -26,6 +27,7 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -174,6 +176,18 @@
return new NullStorageLargeServerMessage();
}
+ public LargeServerMessage createLargeMessage(byte [] header)
+ {
+ NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
+
+ HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+
+ largeMessage.decodeProperties(headerBuffer);
+
+ return largeMessage;
+ }
+
+
public long generateUniqueID()
{
long id = idSequence.getAndIncrement();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,6 +13,9 @@
package org.hornetq.core.remoting.impl;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
@@ -87,6 +90,9 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
@@ -420,6 +426,21 @@
packet = new ReplicationPageEventMessage();
break;
}
+ case REPLICATION_LARGE_MESSAGE_BEGIN:
+ {
+ packet = new ReplicationLargeMessageBeingMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_END:
+ {
+ packet = new ReplicationLargemessageEndMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_WRITE:
+ {
+ packet = new ReplicationLargeMessageWriteMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -159,7 +159,13 @@
public static final byte REPLICATION_PAGE_EVENT = 88;
+ public static final byte REPLICATION_LARGE_MESSAGE_BEGIN = 89;
+ public static final byte REPLICATION_LARGE_MESSAGE_END = 90;
+
+ public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
+
+
// Static --------------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -35,7 +35,7 @@
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
-
+
private boolean isUpdate;
private byte recordType;
@@ -53,7 +53,11 @@
super(REPLICATION_APPEND);
}
- public ReplicationAddMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddMessage(final byte journalID,
+ final boolean isUpdate,
+ final long id,
+ final byte recordType,
+ final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -65,10 +69,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE +
@@ -115,7 +119,7 @@
{
return journalID;
}
-
+
public boolean isUpdate()
{
return isUpdate;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -32,12 +32,12 @@
// Attributes ----------------------------------------------------
private long txId;
-
+
private long id;
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
-
+
private boolean isUpdate;
private byte recordType;
@@ -55,7 +55,12 @@
super(REPLICATION_APPEND_TX);
}
- public ReplicationAddTXMessage(byte journalID, boolean isUpdate, long txId, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddTXMessage(final byte journalID,
+ final boolean isUpdate,
+ final long txId,
+ final long id,
+ final byte recordType,
+ final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -68,10 +73,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
@@ -113,7 +118,7 @@
{
return id;
}
-
+
public long getTxId()
{
return txId;
@@ -126,7 +131,7 @@
{
return journalID;
}
-
+
public boolean isUpdate()
{
return isUpdate;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -46,7 +46,7 @@
super(REPLICATION_COMMIT_ROLLBACK);
}
- public ReplicationCommitMessage(byte journalID, boolean rollback, long txId)
+ public ReplicationCommitMessage(final byte journalID, final boolean rollback, final long txId)
{
this();
this.journalID = journalID;
@@ -56,6 +56,7 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_LONG;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,7 +13,6 @@
package org.hornetq.core.remoting.impl.wireformat;
-import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.DataConstants;
@@ -45,7 +44,7 @@
super(REPLICATION_DELETE);
}
- public ReplicationDeleteMessage(byte journalID, long id)
+ public ReplicationDeleteMessage(final byte journalID, final long id)
{
this();
this.journalID = journalID;
@@ -54,11 +53,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_LONG;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
}
@@ -92,7 +90,6 @@
return journalID;
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -32,7 +32,7 @@
// Attributes ----------------------------------------------------
private long txId;
-
+
private long id;
/** 0 - Bindings, 1 - MessagesJournal */
@@ -51,7 +51,10 @@
super(REPLICATION_DELETE_TX);
}
- public ReplicationDeleteTXMessage(byte journalID, long txId, long id, EncodingSupport encodingData)
+ public ReplicationDeleteTXMessage(final byte journalID,
+ final long txId,
+ final long id,
+ final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -62,10 +65,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
DataConstants.SIZE_INT +
@@ -101,7 +104,7 @@
{
return id;
}
-
+
public long getTxId()
{
return txId;
@@ -114,7 +117,7 @@
{
return journalID;
}
-
+
/**
* @return the recordData
*/
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargeMessageBeingMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargeMessageBeingMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ byte header[];
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationLargeMessageBeingMessage(final byte[] header)
+ {
+ this();
+ this.header = header;
+ }
+
+ public ReplicationLargeMessageBeingMessage()
+ {
+ super(REPLICATION_LARGE_MESSAGE_BEGIN);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + header.length;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(header.length);
+ buffer.writeBytes(header);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ int size = buffer.readInt();
+ header = new byte[size];
+ buffer.readBytes(header);
+ }
+
+ /**
+ * @return the header
+ */
+ public byte[] getHeader()
+ {
+ return header;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargeMessageWriteMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargeMessageWriteMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long messageId;
+
+ private byte body[];
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public ReplicationLargeMessageWriteMessage()
+ {
+ super(REPLICATION_LARGE_MESSAGE_WRITE);
+ }
+
+ /**
+ * @param messageId
+ * @param body
+ */
+ public ReplicationLargeMessageWriteMessage(final long messageId, final byte[] body)
+ {
+ this();
+
+ this.messageId = messageId;
+ this.body = body;
+ }
+
+ // Public --------------------------------------------------------
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT + body.length;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(messageId);
+ buffer.writeInt(body.length);
+ buffer.writeBytes(body);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ messageId = buffer.readLong();
+ int size = buffer.readInt();
+ body = new byte[size];
+ buffer.readBytes(body);
+ }
+
+ /**
+ * @return the messageId
+ */
+ public long getMessageId()
+ {
+ return messageId;
+ }
+
+ /**
+ * @return the body
+ */
+ public byte[] getBody()
+ {
+ return body;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargemessageEndMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargemessageEndMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ long messageId;
+
+ boolean isDelete;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationLargemessageEndMessage()
+ {
+ super(REPLICATION_LARGE_MESSAGE_END);
+ }
+
+ public ReplicationLargemessageEndMessage(final long messageId, final boolean isDelete)
+ {
+ this();
+ this.messageId = messageId;
+ this.isDelete = isDelete;
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BOOLEAN;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(messageId);
+ buffer.writeBoolean(isDelete);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ messageId = buffer.readLong();
+ isDelete = buffer.readBoolean();
+ }
+
+ /**
+ * @return the messageId
+ */
+ public long getMessageId()
+ {
+ return messageId;
+ }
+
+ /**
+ * @return the isDelete
+ */
+ public boolean isDelete()
+ {
+ return isDelete;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -49,7 +49,7 @@
super(REPLICATION_PREPARE);
}
- public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)
+ public ReplicationPrepareMessage(final byte journalID, final long txId, final EncodingSupport encodingData)
{
this();
this.journalID = journalID;
@@ -59,10 +59,10 @@
// Public --------------------------------------------------------
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG +
DataConstants.SIZE_INT +
(encodingData != null ? encodingData.getEncodeSize() : recordData.length);
@@ -100,7 +100,7 @@
{
return journalID;
}
-
+
/**
* @return the recordData
*/
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,7 +13,6 @@
package org.hornetq.core.remoting.impl.wireformat;
-
/**
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
@@ -32,18 +31,17 @@
{
super(REPLICATION_RESPONSE);
}
-
+
// Public --------------------------------------------------------
/* (non-Javadoc)
* @see org.hornetq.core.remoting.Packet#getRequiredBufferSize()
*/
+ @Override
public int getRequiredBufferSize()
{
return BASIC_PACKET_SIZE;
}
-
-
// Package protected ---------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -73,5 +73,13 @@
* @param pageNumber
*/
void pageWrite(PagedMessage message, int pageNumber);
+
+ void largeMessageBegin(byte [] header);
+
+ void largeMessageWrite(long messageId, byte [] body);
+
+ void largeMessageEnd(long messageId);
+
+ void largeMessageDelete(long messageId);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,6 +13,10 @@
package org.hornetq.core.replication.impl;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -33,12 +37,16 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SimpleString;
@@ -72,6 +80,8 @@
private PagingManager pageManager;
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+
+ private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServer server)
@@ -120,7 +130,22 @@
{
handlePageEvent((ReplicationPageEventMessage)packet);
}
-
+ else if (packet.getType() == REPLICATION_LARGE_MESSAGE_BEGIN)
+ {
+ handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
+ }
+ else if (packet.getType() == REPLICATION_LARGE_MESSAGE_WRITE)
+ {
+ handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
+ }
+ else if (packet.getType() == REPLICATION_LARGE_MESSAGE_END)
+ {
+ handleLargeMessageEnd((ReplicationLargemessageEndMessage)packet);
+ }
+ else
+ {
+ log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
+ }
}
catch (Exception e)
{
@@ -171,6 +196,31 @@
{
channel.close();
storage.stop();
+
+ for (ConcurrentMap<Integer, Page> map : pageIndex.values())
+ {
+ for (Page page : map.values())
+ {
+ try
+ {
+ page.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error while closing the page on backup", e);
+ }
+ }
+ }
+
+ pageIndex.clear();
+
+
+ for (LargeServerMessage largeMessage : largeMessages.values())
+ {
+ largeMessage.releaseResources();
+ }
+
+ largeMessages.clear();
}
/* (non-Javadoc)
@@ -194,10 +244,87 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageEnd(ReplicationLargemessageEndMessage packet)
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete());
+ if (message != null)
+ {
+ if (packet.isDelete())
+ {
+ try
+ {
+ message.deleteFile();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
+ }
+ }
+ else
+ {
+ try
+ {
+ message.setStored();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
+ }
+ }
+ }
+ }
/**
* @param packet
*/
+ private void handleLargeMessageWrite(ReplicationLargeMessageWriteMessage packet) throws Exception
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
+ if (message != null)
+ {
+ message.addBytes(packet.getBody());
+ }
+ }
+
+
+ private LargeServerMessage lookupLargeMessage(long messageId, boolean isDelete)
+ {
+ LargeServerMessage message;
+
+ if (isDelete)
+ {
+ message = largeMessages.remove(messageId);
+ }
+ else
+ {
+ message = largeMessages.get(messageId);
+ }
+
+ if (message == null)
+ {
+ log.warn("Large MessageID " + messageId + " is not available on backup server. Ignoring replication message");
+ }
+
+ return message;
+
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageBegin(ReplicationLargeMessageBeingMessage packet)
+ {
+ LargeServerMessage largeMessage = storage.createLargeMessage(packet.getHeader());
+ this.largeMessages.put(largeMessage.getMessageID(), largeMessage);
+ }
+
+
+ /**
+ * @param packet
+ */
private void handleCommitRollback(final ReplicationCommitMessage packet) throws Exception
{
Journal journalToUse = getJournal(packet.getJournalID());
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -33,6 +33,9 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
@@ -252,8 +255,54 @@
sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
}
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageBegin(byte[])
+ */
+ public void largeMessageBegin(byte[] header)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargeMessageBeingMessage(header));
+ }
+ }
/* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageDelete(long)
+ */
+ public void largeMessageDelete(long messageId)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, true));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageEnd(long)
+ */
+ public void largeMessageEnd(long messageId)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, false));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])
+ */
+ public void largeMessageWrite(long messageId, byte[] body)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
+ }
+ }
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
public synchronized boolean isStarted()
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -164,7 +164,6 @@
private final SimpleString nodeID;
// The current currentLargeMessage being processed
- // In case of replication, currentLargeMessage should only be accessed within the replication callbacks
private volatile LargeServerMessage currentLargeMessage;
private ServerSessionPacketHandler handler;
@@ -1688,13 +1687,7 @@
private LargeServerMessage createLargeMessageStorage(final byte[] header) throws Exception
{
- LargeServerMessage largeMessage = storageManager.createLargeMessage();
-
- HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
-
- largeMessage.decodeProperties(headerBuffer);
-
- return largeMessage;
+ return storageManager.createLargeMessage(header);
}
private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-10 23:35:44 UTC (rev 8078)
@@ -67,6 +67,7 @@
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.SimpleString;
+import org.jboss.netty.channel.Channels;
/**
* A ReplicationTest
@@ -231,8 +232,23 @@
manager.pageDeleted(dummy, 4);
manager.pageDeleted(dummy, 5);
manager.pageDeleted(dummy, 6);
+
+ blockOnReplication(manager);
+ ServerMessageImpl serverMsg = new ServerMessageImpl();
+ serverMsg.setMessageID(500);
+ serverMsg.setDestination(new SimpleString("tttt"));
+
+
+ HornetQBuffer buffer = ChannelBuffers.dynamicBuffer(100);
+ serverMsg.encodeProperties(buffer);
+
+ manager.largeMessageBegin(buffer.array());
+ manager.largeMessageWrite(500, new byte[1024]);
+
+ manager.largeMessageEnd(500);
+
blockOnReplication(manager);
store.start();
14 years, 7 months
JBoss hornetq SVN: r8077 - branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-09 16:24:02 -0400 (Fri, 09 Oct 2009)
New Revision: 8077
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java
Log:
tweak
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java 2009-10-09 13:36:27 UTC (rev 8076)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java 2009-10-09 20:24:02 UTC (rev 8077)
@@ -53,12 +53,11 @@
}
// Public --------------------------------------------------------
-
+
+ @Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
- DataConstants.SIZE_INT +
- pagedMessage.getEncodeSize();
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + pagedMessage.getEncodeSize();
}
@@ -72,7 +71,7 @@
@Override
public void decodeBody(final HornetQBuffer buffer)
{
- this.pageNumber = buffer.readInt();
+ pageNumber = buffer.readInt();
pagedMessage = new PagedMessageImpl();
pagedMessage.decode(buffer);
}
@@ -92,10 +91,7 @@
{
return pagedMessage;
}
-
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
14 years, 7 months
JBoss hornetq SVN: r8076 - trunk/src/config/common/schema.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-09 09:36:27 -0400 (Fri, 09 Oct 2009)
New Revision: 8076
Modified:
trunk/src/config/common/schema/hornetq-configuration.xsd
Log:
configuration schema
* fixed send-to-dla-on-no-route type to xsd:boolean
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-10-09 12:26:24 UTC (rev 8075)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-10-09 13:36:27 UTC (rev 8076)
@@ -450,7 +450,7 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="redistribution-delay" type="xsd:long">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="send-to-dla-on-no-route" type="boolean">
+ <xsd:element maxOccurs="1" minOccurs="0" name="send-to-dla-on-no-route" type="xsd:boolean">
</xsd:element>
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required"/>
14 years, 7 months
JBoss hornetq SVN: r8075 - in trunk: docs/user-manual/en and 6 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-09 08:26:24 -0400 (Fri, 09 Oct 2009)
New Revision: 8075
Modified:
trunk/build-hornetq.xml
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/configuring-transports.xml
trunk/docs/user-manual/en/management.xml
trunk/docs/user-manual/en/queue-attributes.xml
trunk/docs/user-manual/en/send-guarantees.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
some docs changes plus set block on persistent and non transactional to default to true
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/build-hornetq.xml 2009-10-09 12:26:24 UTC (rev 8075)
@@ -569,6 +569,7 @@
<target name="jar"
depends="jar-core, jar-core-client, jar-jms, jar-jms-client, jar-transports, jar-jboss-integration, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client">
</target>
+
<target name="jar-jnp-client" depends="init">
<jar jarfile="${build.jars.dir}/${jnp.client.jar.name}">
<zipfileset src="${org.jboss.naming.lib}/jnpserver.jar">
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-10-09 12:26:24 UTC (rev 8075)
@@ -195,7 +195,7 @@
<entry>Boolean</entry>
<entry>if true wait for non transaction data to be synced to the journal
before returning response to client.</entry>
- <entry>false</entry>
+ <entry>true</entry>
</row>
<row>
<entry><link linkend="configuring.message.journal.journal-type"
@@ -213,9 +213,10 @@
</row>
<row>
<entry><link linkend="management.jmx.configuration"
- >jmx-domain</link></entry>
+ >jmx-domain</link></entry>
<entry>String</entry>
- <entry>the JMX domain used to registered HornetQ MBeans in the MBeanServer</entry>
+ <entry>the JMX domain used to registered HornetQ MBeans in the
+ MBeanServer</entry>
<entry>org.hornetq</entry>
</row>
<row>
@@ -842,7 +843,7 @@
>connection-factory.block-on-persistent-send</link></entry>
<entry>Boolean</entry>
<entry>whether or not persistent messages are sent synchronously</entry>
- <entry>false</entry>
+ <entry>true</entry>
</row>
<row>
<entry>connection-factory.call-timeout</entry>
Modified: trunk/docs/user-manual/en/configuring-transports.xml
===================================================================
--- trunk/docs/user-manual/en/configuring-transports.xml 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/docs/user-manual/en/configuring-transports.xml 2009-10-09 12:26:24 UTC (rev 8075)
@@ -204,7 +204,9 @@
name or IP address to connect to (when configuring a connector) or to listen
on (when configuring an acceptor). The default value for this property is
<literal>localhost</literal>. When configuring acceptors, multiple hosts
- or IP addresses can be specified by separating them with commas.</para>
+ or IP addresses can be specified by separating them with commas. It's not
+ valid to specify multiple addresses when specifying the host for a
+ connector; a connector makes a connection to one specific address.</para>
<note>
<para>Don't forget to specify a host name or ip address! If you want your
server able to accept connections from other nodes you must specify a
@@ -212,6 +214,11 @@
incoming connections. The default is localhost which of course is not
accessible from remote nodes!</para>
</note>
+ <note>
+ <para>Although an address 0.0.0.0 is sometimes used by other systems to mean
+ "bind to all available addresses", this is not a valid address for a
+ Netty acceptor to bind to.</para>
+ </note>
</listitem>
<listitem>
<para><literal>hornetq.remoting.netty.port</literal>. This specified the port to
Modified: trunk/docs/user-manual/en/management.xml
===================================================================
--- trunk/docs/user-manual/en/management.xml 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/docs/user-manual/en/management.xml 2009-10-09 12:26:24 UTC (rev 8075)
@@ -76,8 +76,8 @@
<itemizedlist>
<listitem>
<para>Listing, creating, deploying and destroying queues</para>
- <para>A list of deployed core queues can be retrieved using the
- <literal>getQueueNames()</literal> method.</para>
+ <para>A list of deployed core queues can be retrieved using the <literal
+ >getQueueNames()</literal> method.</para>
<para>Core queues can be created or destroyed using the management operations
<literal>createQueue()</literal> or <literal>deployQueue()</literal> or
<literal>destroyQueue()</literal>)on the <literal
@@ -88,6 +88,13 @@
<literal>deployQueue</literal> will do nothing.</para>
</listitem>
<listitem>
+ <para>Pausing and resuming Queues</para>
+ <para>The <literal>QueueControl</literal> can pause and resume the underlying
+ queue. When a queue is paused, it will receive messages but will not deliver
+ them. When it's resumed, it'll begin delivering the queued messages, if any.
+ </para>
+ </listitem>
+ <listitem>
<para>Listing and closing remote connections</para>
<para>Client's remote addresses can be retrieved using <literal
>listRemoteAddresses()</literal>. It is also possible to close the
@@ -106,9 +113,9 @@
Strings.) To commit or rollback a given prepared transaction, the <literal
>commitPreparedTransaction</literal>() or <literal
>rollbackPreparedTransaction()</literal> method can be used to resolve
- heuristic transactions. Heuristically completed transactions can be listed using
- the <literal>listHeuristicCommittedTransactions()</literal> and
- <literal>listHeuristicRolledBackTransactions</literal> methods.</para>
+ heuristic transactions. Heuristically completed transactions can be listed
+ using the <literal>listHeuristicCommittedTransactions()</literal> and <literal
+ >listHeuristicRolledBackTransactions</literal> methods.</para>
</listitem>
<listitem>
<para>Enabling and resetting Message counters</para>
@@ -177,9 +184,10 @@
<para>Messages can also be removed from the queue by using the <literal
>removeMessages()</literal> method which returns a <literal
>boolean</literal> for the single message ID variant or the number of
- removed messages for the filter variant. The <literal>removeMessages()</literal>
- method takes a <literal>filter</literal> argument to remove only filtered messages. Setting
- the filter to an empty string will in effect remove all messages.</para>
+ removed messages for the filter variant. The <literal
+ >removeMessages()</literal> method takes a <literal>filter</literal>
+ argument to remove only filtered messages. Setting the filter to an empty
+ string will in effect remove all messages.</para>
</listitem>
<listitem>
<para>Counting messages</para>
@@ -211,12 +219,12 @@
if it was created with one, <literal>isDurable()</literal> to know wether the
queue is durable or not, etc.)</para>
</listitem>
- <listitem>
+ <listitem>
<para>Pausing and resuming Queues</para>
- <para>The <literal>QueueControl</literal> can pause and resume the underlying queue.
- When a queue is paused, it will receive messages but will not deliver them. When it's resume, it'll begin
- delivering the queued messages, if any.
- </para>
+ <para>The <literal>QueueControl</literal> can pause and resume the underlying
+ queue. When a queue is paused, it will receive messages but will not deliver
+ them. When it's resume, it'll begin delivering the queued messages, if any.
+ </para>
</listitem>
</itemizedlist>
</section>
@@ -310,8 +318,8 @@
<itemizedlist>
<listitem>
<para>Listing, creating, destroying connection factories</para>
- <para>Names of the deployed connection factories can be retrieved by the
- <literal>getConnectionFactoryNames()</literal> method.</para>
+ <para>Names of the deployed connection factories can be retrieved by the <literal
+ >getConnectionFactoryNames()</literal> method.</para>
<para>JMS connection factories can be created or destroyed using the <literal
>createConnectionFactory()</literal> methods or <literal
>destroyConnectionFactory()</literal> methods. These connection factories
@@ -328,8 +336,8 @@
</listitem>
<listitem>
<para>Listing, creating, destroying queues</para>
- <para>Names of the deployed JMS queues can be retrieved by the
- <literal>getQueueNames()</literal> method.</para>
+ <para>Names of the deployed JMS queues can be retrieved by the <literal
+ >getQueueNames()</literal> method.</para>
<para>JMS queues can be created or destroyed using the <literal
>createQueue()</literal> methods or <literal>destroyQueue()</literal>
methods. These queues are bound to JNDI so that JMS clients can look them
@@ -337,8 +345,8 @@
</listitem>
<listitem>
<para>Listing, creating/destroying topics</para>
- <para>Names of the deployed topics can be retrieved by the
- <literal>getTopicNames()</literal> method.</para>
+ <para>Names of the deployed topics can be retrieved by the <literal
+ >getTopicNames()</literal> method.</para>
<para>JMS topics can be created or destroyed using the <literal
>createTopic()</literal> or <literal>destroyTopic()</literal> methods. These
topics are bound to JNDI so that JMS clients can look them up</para>
@@ -407,10 +415,10 @@
<para>Messages can also be removed from the queue by using the <literal
>removeMessages()</literal> method which returns a <literal
>boolean</literal> for the single message ID variant or the number of
- removed messages for the filter variant.
- The <literal>removeMessages()</literal>
- method takes a <literal>filter</literal> argument to remove only filtered messages. Setting
- the filter to an empty string will in effect remove all messages.</para>
+ removed messages for the filter variant. The <literal
+ >removeMessages()</literal> method takes a <literal>filter</literal>
+ argument to remove only filtered messages. Setting the filter to an empty
+ string will in effect remove all messages.</para>
</listitem>
<listitem>
<para>Counting messages</para>
@@ -440,12 +448,12 @@
is temporary or not, <literal>isDurable()</literal> to know wether the queue is
durable or not, etc.)</para>
</listitem>
- <listitem>
+ <listitem>
<para>Pausing and resuming queues</para>
- <para>The <literal>JMSQueueControl</literal> can pause and resume the underlying queue.
- When the queue is paused it will continue to receive messages but will not deliver them.
- When resumed again it will deliver the enqueued messages, if any.
- </para>
+ <para>The <literal>JMSQueueControl</literal> can pause and resume the underlying
+ queue. When the queue is paused it will continue to receive messages but will
+ not deliver them. When resumed again it will deliver the enqueued messages, if
+ any. </para>
</listitem>
</itemizedlist>
</section>
@@ -519,10 +527,10 @@
>Java Management guide</ulink> to configure the server for remote management (system
properties must be set in <literal>run.sh</literal> or <literal>run.bat</literal>
scripts).</para>
- <para>By default, HornetQ server uses the JMX domain "org.hornetq". To manage several HornetQ servers
- from the <emphasis>same</emphasis> MBeanServer, the JMX domain can be configured for each individual
- HornetQ server by setting <literal>jmx-domain</literal> in <literal>hornetq-configuration.xml</literal>:
- </para>
+ <para>By default, HornetQ server uses the JMX domain "org.hornetq". To manage several
+ HornetQ servers from the <emphasis>same</emphasis> MBeanServer, the JMX domain can be
+ configured for each individual HornetQ server by setting <literal>jmx-domain</literal>
+ in <literal>hornetq-configuration.xml</literal>: </para>
<programlisting>
<!-- use a specific JMX domain for HornetQ MBeans -->
<jmx-domain>my.org.hornetq</jmx-domain>
Modified: trunk/docs/user-manual/en/queue-attributes.xml
===================================================================
--- trunk/docs/user-manual/en/queue-attributes.xml 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/docs/user-manual/en/queue-attributes.xml 2009-10-09 12:26:24 UTC (rev 8075)
@@ -86,9 +86,9 @@
</section>
<section id="queue-attributes.address-settings">
<title>Configuring Queues Via Address Settings</title>
- <para>There are some attributes that are defined against a queue rather than a specific
- queue. Here an example of an <literal>address-setting</literal> entry that would be
- found in the <literal>hornetq-configuration.xml</literal> file.</para>
+ <para>There are some attributes that are defined against an address wildcard rather than a
+ specific queue. Here an example of an <literal>address-setting</literal> entry that
+ would be found in the <literal>hornetq-configuration.xml</literal> file.</para>
<programlisting><address-settings>
<address-setting match="jms.queue.exampleQueue">
<dead-letter-address>jms.queue.deadLetterQueue</dead-letter-address>
@@ -100,9 +100,17 @@
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>20000</page-size-bytes>
<redistribution-delay>0</redistribution-delay>
+ <send-to-dla-on-no-route>true</send-to-dla-on-no-route>
</address-setting>
</address-settings></programlisting>
- <para>These are explained fully throughout the user manual, howvere here is a brief
+ <para>The idea with address settings, is you can provide a block of settings which will be
+ applied against any adresses that match the string in the <literal>match</literal> attribute. In the
+ above example the settings would only be applied to any addresses which exactly match
+ the address <literal>jms.queue.exampleQueue</literal>, but you can also use wildcards to apply sets of
+ configuration against many addresses. The wildcard syntax used is described <link linkend="wildcard-syntax">here</link>.</para>
+ <para>For example, if you used the <literal>match</literal> string <literal>jms.queue.#</literal> the settings would be applied
+ to all addresses which start with <literal>jms.queue.</literal> which would be all JMS queues.</para>
+ <para>The meaning of the specific settings are explained fully throughout the user manual, however here is a brief
description with a link to the appropriate chapter if available. </para>
<para><literal>max-delivery-attempts</literal> defines how many time a cancelled message can
be redelivered before sending to the <literal>dead-letter-address</literal>. A full
@@ -115,7 +123,7 @@
see <link linkend="message-expiry.configuring">here</link>.</para>
<para><literal>last-value-queue</literal> defines whether a queue only uses last values or
not. see <link linkend="last-value-queues">here</link>.</para>
- <para><literal>distribution-policy-class</literal> define the class to use for distribution
+ <para><literal>distribution-policy-class</literal> defines the class to use for distribution
of messages by a queue to consumers. By default this is <literal
>org.hornetq.core.server.impl.RoundRobinDistributor</literal>.</para>
<para><literal>max-size-bytes</literal> and <literal>page-size-bytes</literal> are used to
@@ -123,5 +131,9 @@
<para><literal>redistribution-delay</literal> defines how long to wait when the last
consumer is closed on a queue before redistributing any messages. see <link
linkend="clusters.message-redistribution">here</link>.</para>
+ <para><literal>send-to-dla-on-no-route</literal>. If a message is sent to an address, but the server does not route it to any queues,
+ for example, there might be no queues bound to that address, or none of the queues have filters that match, then normally that message
+ would be discarded. However if this parameter is set to true for that address, if the message is not routed to any queues it will instead
+ be sent to the dead letter address (DLA) for that address, if it exists.</para>
</section>
</chapter>
Modified: trunk/docs/user-manual/en/send-guarantees.xml
===================================================================
--- trunk/docs/user-manual/en/send-guarantees.xml 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/docs/user-manual/en/send-guarantees.xml 2009-10-09 12:26:24 UTC (rev 8075)
@@ -50,7 +50,7 @@
<para><literal>BlockOnPersistentSend</literal>. If this is set to <literal
>true</literal> then all calls to send for persistent messages on non
transacted sessions will block until the message has reached the server, and a
- response has been sent back. The default value is <literal>false</literal>.
+ response has been sent back. The default value is <literal>true</literal>.
</para>
</listitem>
<listitem>
@@ -85,7 +85,7 @@
>journal-sync-non-transactional</literal> is set to <literal>true</literal> the
server will not send a response back to the client until the message has been persisted
and the server has a guarantee that the data has been persisted to disk. The default
- value for this parameter is <literal>false</literal>.</para>
+ value for this parameter is <literal>true</literal>.</para>
</section>
<section id="send-guarantees.nontrans.acks">
<title>Guarantees of Non Transactional Acknowledgements</title>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-10-09 12:26:24 UTC (rev 8075)
@@ -450,6 +450,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="redistribution-delay" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="send-to-dla-on-no-route" type="boolean">
+ </xsd:element>
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required"/>
</xsd:complexType>
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-10-09 12:26:24 UTC (rev 8075)
@@ -79,7 +79,7 @@
public static final boolean DEFAULT_BLOCK_ON_ACKNOWLEDGE = false;
- public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = false;
+ public static final boolean DEFAULT_BLOCK_ON_PERSISTENT_SEND = true;
public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-09 12:26:24 UTC (rev 8075)
@@ -89,7 +89,7 @@
public static final boolean DEFAULT_JOURNAL_SYNC_TRANSACTIONAL = true;
- public static final boolean DEFAULT_JOURNAL_SYNC_NON_TRANSACTIONAL = false;
+ public static final boolean DEFAULT_JOURNAL_SYNC_NON_TRANSACTIONAL = true;
public static final int DEFAULT_JOURNAL_FILE_SIZE = 10485760;
Modified: trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java 2009-10-09 12:26:24 UTC (rev 8075)
@@ -28,7 +28,6 @@
public class AddressSettingsDeployer extends XmlDeployer
{
private static final Logger log = Logger.getLogger(AddressSettingsDeployer.class);
-
private static final String DEAD_LETTER_ADDRESS_NODE_NAME = "dead-letter-address";
@@ -51,6 +50,8 @@
private static final String LVQ_NODE_NAME = "last-value-queue";
private static final String REDISTRIBUTION_DELAY_NODE_NAME = "redistribution-delay";
+
+ private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route";
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -139,6 +140,10 @@
{
addressSettings.setRedistributionDelay(Long.valueOf(child.getTextContent().trim()));
}
+ else if (SEND_TO_DLA_ON_NO_ROUTE.equalsIgnoreCase(child.getNodeName()))
+ {
+ addressSettings.setSendToDLAOnNoRoute(Boolean.valueOf(child.getTextContent().trim()));
+ }
}
addressSettingsRepository.addMatch(match, addressSettings);
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-10-09 12:26:24 UTC (rev 8075)
@@ -53,6 +53,5 @@
ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
- void setOriginalHeaders(ServerMessage other, boolean expiry);
-
+ void setOriginalHeaders(ServerMessage other, boolean expiry);
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-09 02:45:05 UTC (rev 8074)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-09 12:26:24 UTC (rev 8075)
@@ -642,6 +642,7 @@
csf.setRetryInterval(retryInterval);
csf.setRetryIntervalMultiplier(retryIntervalMultiplier);
csf.setReconnectAttempts(reconnectAttempts);
+ csf.setBlockOnPersistentSend(false);
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(clusterUser, clusterPassword, false, true, true, true, 1);
14 years, 7 months