[hornetq-commits] JBoss hornetq SVN: r9066 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Apr 7 08:12:03 EDT 2010
Author: timfox
Date: 2010-04-07 08:12:01 -0400 (Wed, 07 Apr 2010)
New Revision: 9066
Added:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
Removed:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java
Modified:
trunk/docs/user-manual/en/client-reconnection.xml
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-275
Modified: trunk/docs/user-manual/en/client-reconnection.xml
===================================================================
--- trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 12:12:01 UTC (rev 9066)
@@ -50,11 +50,10 @@
instance using the appropriate setter method.</para>
<para>If you're using core you can set these values directly on the <literal
>ClientSessionFactory</literal> instance using the appropriate setter method.</para>
- <para>The window is specified in bytes, and has a default value of <literal
- >1MiB</literal>.</para>
+ <para>The window is specified in bytes.</para>
<para>Setting this parameter to <literal>-1</literal> disables any buffering and prevents
any re-attachment from occurring, forcing reconnect instead. The default value for this
- parameter is <literal>-1</literal>.</para>
+ parameter is <literal>-1</literal>. (Which means by default no auto re-attachment will occur)</para>
</section>
<section>
<title>Session reconnection</title>
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -79,7 +79,8 @@
private final int ackBatchSize;
- private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(false, ClientConsumerImpl.NUM_PRIORITIES);
+ private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(false,
+ ClientConsumerImpl.NUM_PRIORITIES);
private final Runner runner = new Runner();
@@ -114,9 +115,11 @@
private boolean stopped = false;
private final AtomicLong forceDeliveryCount = new AtomicLong(0);
-
+
private final SessionQueueQueryResponseMessage queueInfo;
+ private volatile boolean ackIndividually;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -151,7 +154,7 @@
this.clientWindowSize = clientWindowSize;
this.ackBatchSize = ackBatchSize;
-
+
this.queueInfo = queueInfo;
}
@@ -192,7 +195,7 @@
// Effectively infinite
timeout = Long.MAX_VALUE;
}
-
+
boolean deliveryForced = false;
long start = -1;
@@ -414,6 +417,8 @@
lastAckedMessage = null;
creditsToSend = 0;
+
+ ackIndividually = false;
}
public synchronized void start()
@@ -435,7 +440,7 @@
{
return queueInfo;
}
-
+
public long getID()
{
return id;
@@ -463,16 +468,24 @@
// This is ok - we just ignore the message
return;
}
-
+
ClientMessageInternal messageToHandle = message;
-
+
if (messageToHandle.getAddress() == null)
{
messageToHandle.setAddressTransient(queueInfo.getAddress());
}
messageToHandle.onReceipt(this);
-
+
+ if (message.getPriority() != 4)
+ {
+ // We have messages of different priorities so we need to ack them individually since the order
+ // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
+ // consumed in, which means that acking all up to won't work
+ ackIndividually = true;
+ }
+
// Add it to the buffer
buffer.addLast(messageToHandle, messageToHandle.getPriority());
@@ -546,9 +559,9 @@
// Need to send credits for the messages in the buffer
HQIterator<ClientMessageInternal> iter = buffer.iterator();
-
+
ClientMessageInternal message;
-
+
while ((message = iter.next()) != null)
{
flowControlBeforeConsumption(message);
@@ -575,16 +588,28 @@
public void acknowledge(final ClientMessage message) throws HornetQException
{
ClientMessageInternal cmi = (ClientMessageInternal)message;
-
- ackBytes += message.getEncodeSize();
- if (ackBytes >= ackBatchSize)
+ if (ackIndividually)
{
- doAck(cmi);
+ if (lastAckedMessage != null)
+ {
+ flushAcks();
+ }
+
+ session.individualAcknowledge(id, message.getMessageID());
}
else
{
- lastAckedMessage = cmi;
+ ackBytes += message.getEncodeSize();
+
+ if (ackBytes >= ackBatchSize)
+ {
+ doAck(cmi);
+ }
+ else
+ {
+ lastAckedMessage = cmi;
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -51,8 +51,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
@@ -576,10 +577,10 @@
}
public ClientMessage createMessage(final byte type,
- final boolean durable,
- final long expiration,
- final long timestamp,
- final byte priority)
+ final boolean durable,
+ final long expiration,
+ final long timestamp,
+ final byte priority)
{
return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize);
}
@@ -712,6 +713,30 @@
}
}
+ public void individualAcknowledge(final long consumerID, final long messageID) throws HornetQException
+ {
+ // if we're pre-acknowledging then we don't need to do anything
+ if (preAcknowledge)
+ {
+ return;
+ }
+
+ checkClosed();
+
+ SessionIndividualAcknowledgeMessage message = new SessionIndividualAcknowledgeMessage(consumerID,
+ messageID,
+ blockOnAcknowledge);
+
+ if (blockOnAcknowledge)
+ {
+ channel.sendBlocking(message);
+ }
+ else
+ {
+ channel.sendBatched(message);
+ }
+ }
+
public void expire(final long consumerID, final long messageID) throws HornetQException
{
checkClosed();
@@ -719,7 +744,7 @@
// We don't send expiries for pre-ack since message will already have been acked on server
if (!preAcknowledge)
{
- SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+ SessionExpireMessage message = new SessionExpireMessage(consumerID, messageID);
channel.send(message);
}
@@ -851,9 +876,9 @@
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
-
+
int lcid = channel.getLastConfirmedCommandID();
-
+
Packet request = new ReattachSessionMessage(name, lcid);
Channel channel1 = backupConnection.getChannel(1, -1);
@@ -864,11 +889,11 @@
{
// The session was found on the server - we reattached transparently ok
- channel.replayCommands(response.getLastConfirmedCommandID());
+ channel.replayCommands(response.getLastConfirmedCommandID());
}
else
{
-
+
// The session wasn't found on the server - probably we're failing over onto a backup server where the
// session won't exist or the target server has been restarted - in this case the session will need to be
// recreated,
@@ -892,7 +917,8 @@
autoCommitAcks,
preAcknowledge,
confirmationWindowSize,
- defaultAddress == null ? null : defaultAddress.toString());
+ defaultAddress == null ? null
+ : defaultAddress.toString());
boolean retry = false;
do
{
@@ -924,7 +950,7 @@
for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
{
SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
-
+
// We try and recreate any non durable queues, since they probably won't be there unless
// they are defined in hornetq-configuration.xml
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
@@ -950,7 +976,7 @@
false);
sendPacketWithoutLock(createConsumerRequest);
-
+
int clientWindowSize = entry.getValue().getClientWindowSize();
if (clientWindowSize != 0)
@@ -995,7 +1021,7 @@
channel.returnBlocking();
}
- channel.setTransferring(false);
+ channel.setTransferring(false);
}
catch (Throwable t)
{
@@ -1014,15 +1040,15 @@
// not having any credits to send
}
}
-
+
private volatile SimpleString defaultAddress;
-
+
public void setAddress(final Message message, final SimpleString address)
{
if (defaultAddress == null)
{
defaultAddress = address;
-
+
message.setAddress(address);
}
else
@@ -1037,9 +1063,7 @@
}
}
}
-
-
-
+
public void setPacketSize(final int packetSize)
{
if (packetSize > this.initialMessagePacketSize)
@@ -1083,7 +1107,7 @@
{
return producerCreditManager.getCredits(address, anon);
}
-
+
public void returnCredits(final SimpleString address)
{
producerCreditManager.returnCredits(address);
@@ -1093,7 +1117,7 @@
{
producerCreditManager.receiveCredits(address, credits);
}
-
+
public ClientProducerCreditManager getProducerCreditManager()
{
return producerCreditManager;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -33,6 +33,8 @@
String getName();
void acknowledge(long consumerID, long messageID) throws HornetQException;
+
+ void individualAcknowledge(long consumerID, long messageID) throws HornetQException;
boolean isCacheLargeMessageClient();
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -103,6 +103,11 @@
{
session.acknowledge(consumerID, messageID);
}
+
+ public void individualAcknowledge(final long consumerID, final long messageID) throws HornetQException
+ {
+ session.individualAcknowledge(consumerID, messageID);
+ }
public void addConsumer(final ClientConsumerInternal consumer)
{
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -24,6 +24,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_SEND;
@@ -67,8 +68,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
@@ -281,7 +283,7 @@
}
case SESS_EXPIRED:
{
- SessionExpiredMessage message = (SessionExpiredMessage)packet;
+ SessionExpireMessage message = (SessionExpireMessage)packet;
session.expire(message.getConsumerID(), message.getMessageID());
break;
}
@@ -414,6 +416,17 @@
closeChannel = true;
break;
}
+ case SESS_INDIVIDUAL_ACKNOWLEDGE:
+ {
+ SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage)packet;
+ requiresResponse = message.isRequiresResponse();
+ session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
+ if (requiresResponse)
+ {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
case SESS_CONSUMER_CLOSE:
{
requiresResponse = true;
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -188,13 +188,15 @@
{
resendCache.add(packet);
}
-
- connection.getTransportConnection().write(buffer, flush, batch);
}
finally
{
lock.unlock();
}
+
+ //The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+ //buffer is full, preventing any incoming buffers being handled and blocking failover
+ connection.getTransportConnection().write(buffer, flush, batch);
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -48,6 +48,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_CREDITS;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
@@ -114,8 +115,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
@@ -216,7 +218,7 @@
}
case SESS_EXPIRED:
{
- packet = new SessionExpiredMessage();
+ packet = new SessionExpireMessage();
break;
}
case SESS_COMMIT:
@@ -379,6 +381,11 @@
packet = new SessionConsumerCloseMessage();
break;
}
+ case SESS_INDIVIDUAL_ACKNOWLEDGE:
+ {
+ packet = new SessionIndividualAcknowledgeMessage();
+ break;
+ }
case NULL_RESPONSE:
{
packet = new NullResponseMessage();
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -149,6 +149,8 @@
public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
public static final byte SESS_PRODUCER_CREDITS = 80;
+
+ public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
// Replication
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -29,7 +29,7 @@
private long consumerID;
private long messageID;
-
+
private boolean requiresResponse;
// Static --------------------------------------------------------
Copied: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java (from rev 9052, trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpireMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionExpireMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+
+ private long messageID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionExpireMessage(final long consumerID, final long messageID)
+ {
+ super(PacketImpl.SESS_EXPIRED);
+
+ this.consumerID = consumerID;
+
+ this.messageID = messageID;
+ }
+
+ public SessionExpireMessage()
+ {
+ super(PacketImpl.SESS_EXPIRED);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+
+ buffer.writeLong(messageID);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ consumerID = buffer.readLong();
+
+ messageID = buffer.readLong();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof SessionExpireMessage == false)
+ {
+ return false;
+ }
+
+ SessionExpireMessage r = (SessionExpireMessage)other;
+
+ return super.equals(other) && consumerID == r.consumerID && messageID == r.messageID;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExpiredMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -1,98 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- */
-public class SessionExpiredMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long consumerID;
-
- private long messageID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionExpiredMessage(final long consumerID, final long messageID)
- {
- super(PacketImpl.SESS_EXPIRED);
-
- this.consumerID = consumerID;
-
- this.messageID = messageID;
- }
-
- public SessionExpiredMessage()
- {
- super(PacketImpl.SESS_EXPIRED);
- }
-
- // Public --------------------------------------------------------
-
- public long getConsumerID()
- {
- return consumerID;
- }
-
- public long getMessageID()
- {
- return messageID;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeLong(consumerID);
-
- buffer.writeLong(messageID);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- consumerID = buffer.readLong();
-
- messageID = buffer.readLong();
- }
-
- @Override
- public boolean equals(final Object other)
- {
- if (other instanceof SessionExpiredMessage == false)
- {
- return false;
- }
-
- SessionExpiredMessage r = (SessionExpiredMessage)other;
-
- return super.equals(other) && consumerID == r.consumerID && messageID == r.messageID;
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionIndividualAcknowledgeMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+
+ private long messageID;
+
+ private boolean requiresResponse;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionIndividualAcknowledgeMessage(final long consumerID, final long messageID, final boolean requiresResponse)
+ {
+ super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+
+ this.consumerID = consumerID;
+
+ this.messageID = messageID;
+
+ this.requiresResponse = requiresResponse;
+ }
+
+ public SessionIndividualAcknowledgeMessage()
+ {
+ super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+
+ buffer.writeLong(messageID);
+
+ buffer.writeBoolean(requiresResponse);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ consumerID = buffer.readLong();
+
+ messageID = buffer.readLong();
+
+ requiresResponse = buffer.readBoolean();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof SessionIndividualAcknowledgeMessage == false)
+ {
+ return false;
+ }
+
+ SessionIndividualAcknowledgeMessage r = (SessionIndividualAcknowledgeMessage)other;
+
+ return super.equals(other) && consumerID == r.consumerID &&
+ messageID == r.messageID &&
+ requiresResponse == r.requiresResponse;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -38,9 +38,11 @@
Queue getQueue();
- MessageReference getExpired(long messageID) throws Exception;
+ MessageReference removeReferenceByID(long messageID) throws Exception;
void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
+
+ void individualAcknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception;
void forceDelivery(long sequence);
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -18,7 +18,6 @@
import javax.transaction.xa.Xid;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.spi.core.protocol.SessionCallback;
/**
*
@@ -43,6 +42,8 @@
void removeConsumer(long consumerID) throws Exception;
void acknowledge(long consumerID, long messageID) throws Exception;
+
+ void individualAcknowledge(long consumerID, long messageID) throws Exception;
void expire(long consumerID, long messageID) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -517,8 +517,32 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
+
+ public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+ {
+ if (browseOnly)
+ {
+ return;
+ }
+
+ MessageReference ref = removeReferenceByID(messageID);
+
+ if (ref == null)
+ {
+ throw new IllegalStateException("Cannot find ref to ack " + messageID);
+ }
+
+ if (autoCommitAcks)
+ {
+ ref.getQueue().acknowledge(ref);
+ }
+ else
+ {
+ ref.getQueue().acknowledge(tx, ref);
+ }
+ }
- public MessageReference getExpired(final long messageID) throws Exception
+ public MessageReference removeReferenceByID(final long messageID) throws Exception
{
if (browseOnly)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -473,10 +473,22 @@
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
+
+ public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
+ {
+ ServerConsumer consumer = consumers.get(consumerID);
+
+ if (this.xa && tx == null)
+ {
+ throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
+ }
+ consumer.individualAcknowledge(autoCommitAcks, tx, messageID);
+ }
+
public void expire(final long consumerID, final long messageID) throws Exception
{
- MessageReference ref = consumers.get(consumerID).getExpired(messageID);
+ MessageReference ref = consumers.get(consumerID).removeReferenceByID(messageID);
if (ref != null)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -201,7 +201,68 @@
session.deleteQueue(queue);
}
+
+ // https://jira.jboss.org/jira/browse/HORNETQ-275
+ public void testOutOfOrderAcknowledgement() throws Exception
+ {
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+
+ ClientConsumer consumer = session.createConsumer(queue);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage m = createTextMessage(Integer.toString(i), session);
+ m.setPriority((byte)i);
+ producer.send(m);
+
+ Thread.sleep(20);
+ }
+
+ // Now we wait a little bit to make sure the messages are in the client side buffer
+
+ // They should have been added to the delivering list in the ServerConsumerImpl in the order
+ // they were sent, not priority order
+
+ //We receive one of the messages
+ ClientMessage m = consumer.receive(500);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(9, m.getPriority());
+
+ //Ack it
+ m.acknowledge();
+
+ consumer.close();
+
+ //Close and try and receive the other ones
+
+ consumer = session.createConsumer(queue);
+
+ // Other messages should be received now
+ // Previously there was a bug whereby if deliveries were stored on server side in send order
+ // then if received in priority order, and acked
+ // the ack would ack all messages up to the one received - resulting in acking
+ // messages that hadn't been delivered yet
+ for (int i = 8; i >= 0; i--)
+ {
+ m = consumer.receive(500);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(i, m.getPriority());
+
+ m.acknowledge();
+ }
+
+ consumer.close();
+
+ session.deleteQueue(queue);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -51,6 +51,7 @@
final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) HornetQClient.createClientSessionFactory(new TransportConfiguration("org.hornetq.integration.transports.netty.NettyConnectorFactory"));
sf.setReconnectAttempts(-1);
sf.setConfirmationWindowSize(1024 * 1024);
+ sf.setAckBatchSize(0);
return sf;
}
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -881,10 +881,10 @@
protected ClientMessage createTextMessage(final String s, final boolean durable, final ClientSession clientSession)
{
ClientMessage message = clientSession.createMessage(HornetQTextMessage.TYPE,
- durable,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ durable,
+ 0,
+ System.currentTimeMillis(),
+ (byte)4);
message.getBodyBuffer().writeString(s);
return message;
}
More information about the hornetq-commits
mailing list