From do-not-reply at jboss.org Wed Apr 7 08:12:03 2010 Content-Type: multipart/mixed; boundary="===============7679837812496739331==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r9066 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories. Date: Wed, 07 Apr 2010 08:12:03 -0400 Message-ID: <201004071212.o37CC3tl006303@svn01.web.mwc.hst.phx2.redhat.com> --===============7679837812496739331== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: timfox Date: 2010-04-07 08:12:01 -0400 (Wed, 07 Apr 2010) New Revision: 9066 Added: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExp= ireMessage.java trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionInd= ividualAcknowledgeMessage.java Removed: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExp= iredMessage.java Modified: trunk/docs/user-manual/en/client-reconnection.xml trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler= .java trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAck= nowledgeMessage.java trunk/src/main/org/hornetq/core/server/ServerConsumer.java trunk/src/main/org/hornetq/core/server/ServerSession.java trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest= .java trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMult= iThreadRandomReattachTest.java trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java Log: https://jira.jboss.org/jira/browse/HORNETQ-275 Modified: trunk/docs/user-manual/en/client-reconnection.xml =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 08:35:35 U= TC (rev 9065) +++ trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 12:12:01 U= TC (rev 9066) @@ -50,11 +50,10 @@ instance using the appropriate setter method. If you're using core you can set these values directly on th= e ClientSessionFactory instance using the appropr= iate setter method. - The window is specified in bytes, and has a default value of= 1MiB. + The window is specified in bytes. Setting this parameter to -1 disables any= buffering and prevents any re-attachment from occurring, forcing reconnect instead. T= he default value for this - parameter is -1. + parameter is -1. (Which means by default no= auto re-attachment will occur)
Session reconnection Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.ja= va =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 201= 0-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 201= 0-04-07 12:12:01 UTC (rev 9066) @@ -79,7 +79,8 @@ = private final int ackBatchSize; = - private final PriorityLinkedList buffer =3D new = PriorityLinkedListImpl(false, ClientConsumerImpl.NUM= _PRIORITIES); + private final PriorityLinkedList buffer =3D new = PriorityLinkedListImpl(false, + = ClientConsumerImpl.NUM_PRIORITI= ES); = private final Runner runner =3D new Runner(); = @@ -114,9 +115,11 @@ private boolean stopped =3D false; = private final AtomicLong forceDeliveryCount =3D new AtomicLong(0); - = + private final SessionQueueQueryResponseMessage queueInfo; = + private volatile boolean ackIndividually; + // Constructors // --------------------------------------------------------------------= ------------- = @@ -151,7 +154,7 @@ this.clientWindowSize =3D clientWindowSize; = this.ackBatchSize =3D ackBatchSize; - = + this.queueInfo =3D queueInfo; } = @@ -192,7 +195,7 @@ // Effectively infinite timeout =3D Long.MAX_VALUE; } - = + boolean deliveryForced =3D false; = long start =3D -1; @@ -414,6 +417,8 @@ lastAckedMessage =3D null; = creditsToSend =3D 0; + = + ackIndividually =3D false; } = public synchronized void start() @@ -435,7 +440,7 @@ { return queueInfo; } - = + public long getID() { return id; @@ -463,16 +468,24 @@ // This is ok - we just ignore the message return; } - = + ClientMessageInternal messageToHandle =3D message; - = + if (messageToHandle.getAddress() =3D=3D null) { messageToHandle.setAddressTransient(queueInfo.getAddress()); } = messageToHandle.onReceipt(this); - = + + if (message.getPriority() !=3D 4) + { + // We have messages of different priorities so we need to ack the= m individually since the order + // of them in the ServerConsumerImpl delivery list might not be t= he same as the order they are + // consumed in, which means that acking all up to won't work + ackIndividually =3D true; + } + // Add it to the buffer buffer.addLast(messageToHandle, messageToHandle.getPriority()); = @@ -546,9 +559,9 @@ // Need to send credits for the messages in the buffer = HQIterator iter =3D buffer.iterator(); - = + ClientMessageInternal message; - = + while ((message =3D iter.next()) !=3D null) { flowControlBeforeConsumption(message); @@ -575,16 +588,28 @@ public void acknowledge(final ClientMessage message) throws HornetQExce= ption { ClientMessageInternal cmi =3D (ClientMessageInternal)message; - = - ackBytes +=3D message.getEncodeSize(); = - if (ackBytes >=3D ackBatchSize) + if (ackIndividually) { - doAck(cmi); + if (lastAckedMessage !=3D null) + { + flushAcks(); + } + = + session.individualAcknowledge(id, message.getMessageID()); } else { - lastAckedMessage =3D cmi; + ackBytes +=3D message.getEncodeSize(); + + if (ackBytes >=3D ackBatchSize) + { + doAck(cmi); + } + else + { + lastAckedMessage =3D cmi; + } } } = Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010= -04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010= -04-07 12:12:01 UTC (rev 9066) @@ -51,8 +51,9 @@ import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowC= reditMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsume= rMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMe= ssage; -import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessag= e; +import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumer= Delivery; +import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAck= nowledgeMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMes= sage; import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryRes= ponseMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContin= uationMessage; @@ -576,10 +577,10 @@ } = public ClientMessage createMessage(final byte type, - final boolean durable, - final long expiration, - final long timestamp, - final byte priority) + final boolean durable, + final long expiration, + final long timestamp, + final byte priority) { return new ClientMessageImpl(type, durable, expiration, timestamp, p= riority, initialMessagePacketSize); } @@ -712,6 +713,30 @@ } } = + public void individualAcknowledge(final long consumerID, final long mes= sageID) throws HornetQException + { + // if we're pre-acknowledging then we don't need to do anything + if (preAcknowledge) + { + return; + } + + checkClosed(); + + SessionIndividualAcknowledgeMessage message =3D new SessionIndividua= lAcknowledgeMessage(consumerID, + = messageID, + = blockOnAcknowledge); + + if (blockOnAcknowledge) + { + channel.sendBlocking(message); + } + else + { + channel.sendBatched(message); + } + } + public void expire(final long consumerID, final long messageID) throws = HornetQException { checkClosed(); @@ -719,7 +744,7 @@ // We don't send expiries for pre-ack since message will already hav= e been acked on server if (!preAcknowledge) { - SessionExpiredMessage message =3D new SessionExpiredMessage(consu= merID, messageID); + SessionExpireMessage message =3D new SessionExpireMessage(consume= rID, messageID); = channel.send(message); } @@ -851,9 +876,9 @@ backupConnection.syncIDGeneratorSequence(remotingConnection.getID= GeneratorSequence()); = remotingConnection =3D backupConnection; - = + int lcid =3D channel.getLastConfirmedCommandID(); - = + Packet request =3D new ReattachSessionMessage(name, lcid); = Channel channel1 =3D backupConnection.getChannel(1, -1); @@ -864,11 +889,11 @@ { // The session was found on the server - we reattached transpa= rently ok = - channel.replayCommands(response.getLastConfirmedCommandID()); = = + channel.replayCommands(response.getLastConfirmedCommandID()); } else { - = + // The session wasn't found on the server - probably we're fai= ling over onto a backup server where the // session won't exist or the target server has been restarted= - in this case the session will need to be // recreated, @@ -892,7 +917,8 @@ autoCommitA= cks, preAcknowle= dge, confirmatio= nWindowSize, - defaultAddr= ess =3D=3D null ? null : defaultAddress.toString()); + defaultAddr= ess =3D=3D null ? null + = : defaultAddress.toString()); boolean retry =3D false; do { @@ -924,7 +950,7 @@ for (Map.Entry entry : consum= ers.entrySet()) { SessionQueueQueryResponseMessage queueInfo =3D entry.get= Value().getQueueInfo(); - = + // We try and recreate any non durable queues, since the= y probably won't be there unless // they are defined in hornetq-configuration.xml // This allows e.g. JMS non durable subs and temporary q= ueues to continue to be used after failover @@ -950,7 +976,7 @@ = false); = sendPacketWithoutLock(createConsumerRequest); - = + int clientWindowSize =3D entry.getValue().getClientWindo= wSize(); = if (clientWindowSize !=3D 0) @@ -995,7 +1021,7 @@ channel.returnBlocking(); } = - channel.setTransferring(false); = + channel.setTransferring(false); } catch (Throwable t) { @@ -1014,15 +1040,15 @@ // not having any credits to send } } - = + private volatile SimpleString defaultAddress; - = + public void setAddress(final Message message, final SimpleString addres= s) { if (defaultAddress =3D=3D null) { defaultAddress =3D address; - = + message.setAddress(address); } else @@ -1037,9 +1063,7 @@ } } } - = - = - = + public void setPacketSize(final int packetSize) { if (packetSize > this.initialMessagePacketSize) @@ -1083,7 +1107,7 @@ { return producerCreditManager.getCredits(address, anon); } - = + public void returnCredits(final SimpleString address) { producerCreditManager.returnCredits(address); @@ -1093,7 +1117,7 @@ { producerCreditManager.receiveCredits(address, credits); } - = + public ClientProducerCreditManager getProducerCreditManager() { return producerCreditManager; Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal= .java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java = 2010-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java = 2010-04-07 12:12:01 UTC (rev 9066) @@ -33,6 +33,8 @@ String getName(); = void acknowledge(long consumerID, long messageID) throws HornetQExcepti= on; + = + void individualAcknowledge(long consumerID, long messageID) throws Horn= etQException; = boolean isCacheLargeMessageClient(); = Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010= -04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010= -04-07 12:12:01 UTC (rev 9066) @@ -103,6 +103,11 @@ { session.acknowledge(consumerID, messageID); } + = + public void individualAcknowledge(final long consumerID, final long mes= sageID) throws HornetQException + { + session.individualAcknowledge(consumerID, messageID); + } = public void addConsumer(final ClientConsumerInternal consumer) { Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacket= Handler.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle= r.java 2010-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle= r.java 2010-04-07 12:12:01 UTC (rev 9066) @@ -24,6 +24,7 @@ import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKE= N; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CO= NSUMER_DELIVERY; +import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDU= AL_ACKNOWLEDGE; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUE= RY; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_SEND; @@ -67,8 +68,9 @@ import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowC= reditMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsume= rMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMe= ssage; -import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessag= e; +import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumer= Delivery; +import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAck= nowledgeMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMes= sage; import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryRes= ponseMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProduc= erCreditsMessage; @@ -281,7 +283,7 @@ } case SESS_EXPIRED: { - SessionExpiredMessage message =3D (SessionExpiredMessage= )packet; + SessionExpireMessage message =3D (SessionExpireMessage)p= acket; session.expire(message.getConsumerID(), message.getMessa= geID()); break; } @@ -414,6 +416,17 @@ closeChannel =3D true; break; } + case SESS_INDIVIDUAL_ACKNOWLEDGE: + { + SessionIndividualAcknowledgeMessage message =3D (Session= IndividualAcknowledgeMessage)packet; + requiresResponse =3D message.isRequiresResponse(); + session.individualAcknowledge(message.getConsumerID(), m= essage.getMessageID()); + if (requiresResponse) + { + response =3D new NullResponseMessage(); + } + break; + } case SESS_CONSUMER_CLOSE: { requiresResponse =3D true; Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.ja= va =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 201= 0-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 201= 0-04-07 12:12:01 UTC (rev 9066) @@ -188,13 +188,15 @@ { resendCache.add(packet); } - - connection.getTransportConnection().write(buffer, flush, batch= ); } finally { lock.unlock(); } + = + //The actual send must be outside the lock, or with OIO transport= , the write can block if the tcp + //buffer is full, preventing any incoming buffers being handled a= nd blocking failover + connection.getTransportConnection().write(buffer, flush, batch); } } = Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.= java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2= 010-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2= 010-04-07 12:12:01 UTC (rev 9066) @@ -48,6 +48,7 @@ import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKE= N; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CO= NSUMER_DELIVERY; +import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDU= AL_ACKNOWLEDGE; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER= _CREDITS; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER= _REQUEST_CREDITS; import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUE= RY; @@ -114,8 +115,9 @@ import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowC= reditMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsume= rMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMe= ssage; -import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessag= e; +import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumer= Delivery; +import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAck= nowledgeMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCredi= tsMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMes= sage; import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryRes= ponseMessage; @@ -216,7 +218,7 @@ } case SESS_EXPIRED: { - packet =3D new SessionExpiredMessage(); + packet =3D new SessionExpireMessage(); break; } case SESS_COMMIT: @@ -379,6 +381,11 @@ packet =3D new SessionConsumerCloseMessage(); break; } + case SESS_INDIVIDUAL_ACKNOWLEDGE: + { + packet =3D new SessionIndividualAcknowledgeMessage(); + break; + } case NULL_RESPONSE: { packet =3D new NullResponseMessage(); Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010= -04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010= -04-07 12:12:01 UTC (rev 9066) @@ -149,6 +149,8 @@ public static final byte SESS_PRODUCER_REQUEST_CREDITS =3D 79; = public static final byte SESS_PRODUCER_CREDITS =3D 80; + = + public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE =3D 81; = // Replication = Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Ses= sionAcknowledgeMessage.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAc= knowledgeMessage.java 2010-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAc= knowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066) @@ -29,7 +29,7 @@ private long consumerID; = private long messageID; - + = private boolean requiresResponse; = // Static -------------------------------------------------------- Copied: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Sessi= onExpireMessage.java (from rev 9052, trunk/src/main/org/hornetq/core/protoc= ol/core/impl/wireformat/SessionExpiredMessage.java) =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx= pireMessage.java (rev 0) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx= pireMessage.java 2010-04-07 12:12:01 UTC (rev 9066) @@ -0,0 +1,98 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.core.impl.wireformat; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.core.protocol.core.impl.PacketImpl; + +/** + * @author Tim Fox + * @version $Revision$ + */ +public class SessionExpireMessage extends PacketImpl +{ + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private long consumerID; + + private long messageID; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public SessionExpireMessage(final long consumerID, final long messageID) + { + super(PacketImpl.SESS_EXPIRED); + + this.consumerID =3D consumerID; + + this.messageID =3D messageID; + } + + public SessionExpireMessage() + { + super(PacketImpl.SESS_EXPIRED); + } + + // Public -------------------------------------------------------- + + public long getConsumerID() + { + return consumerID; + } + + public long getMessageID() + { + return messageID; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeLong(consumerID); + + buffer.writeLong(messageID); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + consumerID =3D buffer.readLong(); + + messageID =3D buffer.readLong(); + } + + @Override + public boolean equals(final Object other) + { + if (other instanceof SessionExpireMessage =3D=3D false) + { + return false; + } + + SessionExpireMessage r =3D (SessionExpireMessage)other; + + return super.equals(other) && consumerID =3D=3D r.consumerID && mess= ageID =3D=3D r.messageID; + } + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} Deleted: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Sess= ionExpiredMessage.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx= piredMessage.java 2010-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx= piredMessage.java 2010-04-07 12:12:01 UTC (rev 9066) @@ -1,98 +0,0 @@ -/* - * Copyright 2009 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.core.impl.wireformat; - -import org.hornetq.api.core.HornetQBuffer; -import org.hornetq.core.protocol.core.impl.PacketImpl; - -/** - * @author Tim Fox - * @version $Revision$ - */ -public class SessionExpiredMessage extends PacketImpl -{ - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private long consumerID; - - private long messageID; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - public SessionExpiredMessage(final long consumerID, final long messageI= D) - { - super(PacketImpl.SESS_EXPIRED); - - this.consumerID =3D consumerID; - - this.messageID =3D messageID; - } - - public SessionExpiredMessage() - { - super(PacketImpl.SESS_EXPIRED); - } - - // Public -------------------------------------------------------- - - public long getConsumerID() - { - return consumerID; - } - - public long getMessageID() - { - return messageID; - } - - @Override - public void encodeRest(final HornetQBuffer buffer) - { - buffer.writeLong(consumerID); - - buffer.writeLong(messageID); - } - - @Override - public void decodeRest(final HornetQBuffer buffer) - { - consumerID =3D buffer.readLong(); - - messageID =3D buffer.readLong(); - } - - @Override - public boolean equals(final Object other) - { - if (other instanceof SessionExpiredMessage =3D=3D false) - { - return false; - } - - SessionExpiredMessage r =3D (SessionExpiredMessage)other; - - return super.equals(other) && consumerID =3D=3D r.consumerID && mess= ageID =3D=3D r.messageID; - } - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- -} Added: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Sessio= nIndividualAcknowledgeMessage.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIn= dividualAcknowledgeMessage.java (rev 0) +++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIn= dividualAcknowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066) @@ -0,0 +1,113 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.core.impl.wireformat; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.core.protocol.core.impl.PacketImpl; + +/** + * @author Tim Fox + * @version $Revision$ + */ +public class SessionIndividualAcknowledgeMessage extends PacketImpl +{ + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private long consumerID; + + private long messageID; + = + private boolean requiresResponse; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public SessionIndividualAcknowledgeMessage(final long consumerID, final= long messageID, final boolean requiresResponse) + { + super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE); + + this.consumerID =3D consumerID; + + this.messageID =3D messageID; + + this.requiresResponse =3D requiresResponse; + } + + public SessionIndividualAcknowledgeMessage() + { + super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE); + } + + // Public -------------------------------------------------------- + + public long getConsumerID() + { + return consumerID; + } + + public long getMessageID() + { + return messageID; + } + + public boolean isRequiresResponse() + { + return requiresResponse; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeLong(consumerID); + + buffer.writeLong(messageID); + + buffer.writeBoolean(requiresResponse); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + consumerID =3D buffer.readLong(); + + messageID =3D buffer.readLong(); + + requiresResponse =3D buffer.readBoolean(); + } + + @Override + public boolean equals(final Object other) + { + if (other instanceof SessionIndividualAcknowledgeMessage =3D=3D fals= e) + { + return false; + } + + SessionIndividualAcknowledgeMessage r =3D (SessionIndividualAcknowle= dgeMessage)other; + + return super.equals(other) && consumerID =3D=3D r.consumerID && + messageID =3D=3D r.messageID && + requiresResponse =3D=3D r.requiresResponse; + } + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 0= 8:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 1= 2:12:01 UTC (rev 9066) @@ -38,9 +38,11 @@ = Queue getQueue(); = - MessageReference getExpired(long messageID) throws Exception; + MessageReference removeReferenceByID(long messageID) throws Exception; = void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID= ) throws Exception; + = + void individualAcknowledge(boolean autoCommitAcks, Transaction tx, long= messageID) throws Exception; = void forceDelivery(long sequence); = = Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 08= :35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 12= :12:01 UTC (rev 9066) @@ -18,7 +18,6 @@ import javax.transaction.xa.Xid; = import org.hornetq.api.core.SimpleString; -import org.hornetq.spi.core.protocol.SessionCallback; = /** * @@ -43,6 +42,8 @@ void removeConsumer(long consumerID) throws Exception; = void acknowledge(long consumerID, long messageID) throws Exception; + = + void individualAcknowledge(long consumerID, long messageID) throws Exce= ption; = void expire(long consumerID, long messageID) throws Exception; = Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.ja= va =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 201= 0-04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 201= 0-04-07 12:12:01 UTC (rev 9066) @@ -517,8 +517,32 @@ } while (ref.getMessage().getMessageID() !=3D messageID); } + = + public void individualAcknowledge(final boolean autoCommitAcks, final T= ransaction tx, final long messageID) throws Exception + { + if (browseOnly) + { + return; + } + = + MessageReference ref =3D removeReferenceByID(messageID); + = + if (ref =3D=3D null) + { + throw new IllegalStateException("Cannot find ref to ack " + messa= geID); + } + = + if (autoCommitAcks) + { + ref.getQueue().acknowledge(ref); + } + else + { + ref.getQueue().acknowledge(tx, ref); + } + } = - public MessageReference getExpired(final long messageID) throws Excepti= on + public MessageReference removeReferenceByID(final long messageID) throw= s Exception { if (browseOnly) { Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010= -04-07 08:35:35 UTC (rev 9065) +++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010= -04-07 12:12:01 UTC (rev 9066) @@ -473,10 +473,22 @@ = consumer.acknowledge(autoCommitAcks, tx, messageID); } + = + public void individualAcknowledge(final long consumerID, final long mes= sageID) throws Exception + { + ServerConsumer consumer =3D consumers.get(consumerID); + = + if (this.xa && tx =3D=3D null) + { + throw new HornetQXAException(XAException.XAER_PROTO, "Invalid tra= nsaction state"); + } = + consumer.individualAcknowledge(autoCommitAcks, tx, messageID); + } + public void expire(final long consumerID, final long messageID) throws = Exception { - MessageReference ref =3D consumers.get(consumerID).getExpired(messag= eID); + MessageReference ref =3D consumers.get(consumerID).removeReferenceBy= ID(messageID); = if (ref !=3D null) { Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessagePrior= ityTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTes= t.java 2010-04-07 08:35:35 UTC (rev 9065) +++ trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTes= t.java 2010-04-07 12:12:01 UTC (rev 9066) @@ -201,7 +201,68 @@ session.deleteQueue(queue); = } + = + // https://jira.jboss.org/jira/browse/HORNETQ-275 + public void testOutOfOrderAcknowledgement() throws Exception + { + SimpleString queue =3D RandomUtil.randomSimpleString(); + SimpleString address =3D RandomUtil.randomSimpleString(); = + session.createQueue(address, queue, false); + + ClientProducer producer =3D session.createProducer(address); + + ClientConsumer consumer =3D session.createConsumer(queue); + + session.start(); + + for (int i =3D 0; i < 10; i++) + { + ClientMessage m =3D createTextMessage(Integer.toString(i), sessio= n); + m.setPriority((byte)i); + producer.send(m); + + Thread.sleep(20); + } + + // Now we wait a little bit to make sure the messages are in the cli= ent side buffer + + // They should have been added to the delivering list in the ServerC= onsumerImpl in the order + // they were sent, not priority order + + //We receive one of the messages + ClientMessage m =3D consumer.receive(500); + Assert.assertNotNull(m); + Assert.assertEquals(9, m.getPriority()); + + //Ack it + m.acknowledge(); + + consumer.close(); + = + //Close and try and receive the other ones + + consumer =3D session.createConsumer(queue); + + // Other messages should be received now + // Previously there was a bug whereby if deliveries were stored on s= erver side in send order + // then if received in priority order, and acked + // the ack would ack all messages up to the one received - resulting= in acking + // messages that hadn't been delivered yet + for (int i =3D 8; i >=3D 0; i--) + { + m =3D consumer.receive(500); + Assert.assertNotNull(m); + Assert.assertEquals(i, m.getPriority()); + + m.acknowledge(); + } + = + consumer.close(); + + session.deleteQueue(queue); + } + // Package protected --------------------------------------------- = // Protected ----------------------------------------------------- Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/Ne= ttyMultiThreadRandomReattachTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMul= tiThreadRandomReattachTest.java 2010-04-07 08:35:35 UTC (rev 9065) +++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMul= tiThreadRandomReattachTest.java 2010-04-07 12:12:01 UTC (rev 9066) @@ -51,6 +51,7 @@ final ClientSessionFactoryInternal sf =3D (ClientSessionFactoryInter= nal) HornetQClient.createClientSessionFactory(new TransportConfiguration("o= rg.hornetq.integration.transports.netty.NettyConnectorFactory")); sf.setReconnectAttempts(-1); sf.setConfirmationWindowSize(1024 * 1024); + sf.setAckBatchSize(0); return sf; } = Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 08:= 35:35 UTC (rev 9065) +++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 12:= 12:01 UTC (rev 9066) @@ -881,10 +881,10 @@ protected ClientMessage createTextMessage(final String s, final boolean= durable, final ClientSession clientSession) { ClientMessage message =3D clientSession.createMessage(HornetQTextMes= sage.TYPE, - durable, - 0, - System.cur= rentTimeMillis(), - (byte)1); + durable, + 0, + System.currentTi= meMillis(), + (byte)4); message.getBodyBuffer().writeString(s); return message; } --===============7679837812496739331==--