From do-not-reply at jboss.org Wed Apr 7 08:12:03 2010
Content-Type: multipart/mixed; boundary="===============5722681955605955828=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r9066 - in trunk:
src/main/org/hornetq/core/client/impl and 8 other directories.
Date: Wed, 07 Apr 2010 08:12:03 -0400
Message-ID: <201004071212.o37CC3tl006303@svn01.web.mwc.hst.phx2.redhat.com>
--===============5722681955605955828==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: timfox
Date: 2010-04-07 08:12:01 -0400 (Wed, 07 Apr 2010)
New Revision: 9066
Added:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExp=
ireMessage.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionInd=
ividualAcknowledgeMessage.java
Removed:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionExp=
iredMessage.java
Modified:
trunk/docs/user-manual/en/client-reconnection.xml
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler=
.java
trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAck=
nowledgeMessage.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest=
.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMult=
iThreadRandomReattachTest.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-275
Modified: trunk/docs/user-manual/en/client-reconnection.xml
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 08:35:35 U=
TC (rev 9065)
+++ trunk/docs/user-manual/en/client-reconnection.xml 2010-04-07 12:12:01 U=
TC (rev 9066)
@@ -50,11 +50,10 @@
instance using the appropriate setter method.
If you're using core you can set these values directly on th=
e ClientSessionFactory instance using the appropr=
iate setter method.
- The window is specified in bytes, and has a default value of=
1MiB.
+ The window is specified in bytes.
Setting this parameter to -1 disables any=
buffering and prevents
any re-attachment from occurring, forcing reconnect instead. T=
he default value for this
- parameter is -1.
+ parameter is -1. (Which means by default no=
auto re-attachment will occur)
Session reconnection
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.ja=
va
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 201=
0-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 201=
0-04-07 12:12:01 UTC (rev 9066)
@@ -79,7 +79,8 @@
=
private final int ackBatchSize;
=
- private final PriorityLinkedList buffer =3D new =
PriorityLinkedListImpl(false, ClientConsumerImpl.NUM=
_PRIORITIES);
+ private final PriorityLinkedList buffer =3D new =
PriorityLinkedListImpl(false,
+ =
ClientConsumerImpl.NUM_PRIORITI=
ES);
=
private final Runner runner =3D new Runner();
=
@@ -114,9 +115,11 @@
private boolean stopped =3D false;
=
private final AtomicLong forceDeliveryCount =3D new AtomicLong(0);
- =
+
private final SessionQueueQueryResponseMessage queueInfo;
=
+ private volatile boolean ackIndividually;
+
// Constructors
// --------------------------------------------------------------------=
-------------
=
@@ -151,7 +154,7 @@
this.clientWindowSize =3D clientWindowSize;
=
this.ackBatchSize =3D ackBatchSize;
- =
+
this.queueInfo =3D queueInfo;
}
=
@@ -192,7 +195,7 @@
// Effectively infinite
timeout =3D Long.MAX_VALUE;
}
- =
+
boolean deliveryForced =3D false;
=
long start =3D -1;
@@ -414,6 +417,8 @@
lastAckedMessage =3D null;
=
creditsToSend =3D 0;
+ =
+ ackIndividually =3D false;
}
=
public synchronized void start()
@@ -435,7 +440,7 @@
{
return queueInfo;
}
- =
+
public long getID()
{
return id;
@@ -463,16 +468,24 @@
// This is ok - we just ignore the message
return;
}
- =
+
ClientMessageInternal messageToHandle =3D message;
- =
+
if (messageToHandle.getAddress() =3D=3D null)
{
messageToHandle.setAddressTransient(queueInfo.getAddress());
}
=
messageToHandle.onReceipt(this);
- =
+
+ if (message.getPriority() !=3D 4)
+ {
+ // We have messages of different priorities so we need to ack the=
m individually since the order
+ // of them in the ServerConsumerImpl delivery list might not be t=
he same as the order they are
+ // consumed in, which means that acking all up to won't work
+ ackIndividually =3D true;
+ }
+
// Add it to the buffer
buffer.addLast(messageToHandle, messageToHandle.getPriority());
=
@@ -546,9 +559,9 @@
// Need to send credits for the messages in the buffer
=
HQIterator iter =3D buffer.iterator();
- =
+
ClientMessageInternal message;
- =
+
while ((message =3D iter.next()) !=3D null)
{
flowControlBeforeConsumption(message);
@@ -575,16 +588,28 @@
public void acknowledge(final ClientMessage message) throws HornetQExce=
ption
{
ClientMessageInternal cmi =3D (ClientMessageInternal)message;
- =
- ackBytes +=3D message.getEncodeSize();
=
- if (ackBytes >=3D ackBatchSize)
+ if (ackIndividually)
{
- doAck(cmi);
+ if (lastAckedMessage !=3D null)
+ {
+ flushAcks();
+ }
+ =
+ session.individualAcknowledge(id, message.getMessageID());
}
else
{
- lastAckedMessage =3D cmi;
+ ackBytes +=3D message.getEncodeSize();
+
+ if (ackBytes >=3D ackBatchSize)
+ {
+ doAck(cmi);
+ }
+ else
+ {
+ lastAckedMessage =3D cmi;
+ }
}
}
=
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010=
-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010=
-04-07 12:12:01 UTC (rev 9066)
@@ -51,8 +51,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowC=
reditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsume=
rMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMe=
ssage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessag=
e;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumer=
Delivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAck=
nowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMes=
sage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryRes=
ponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContin=
uationMessage;
@@ -576,10 +577,10 @@
}
=
public ClientMessage createMessage(final byte type,
- final boolean durable,
- final long expiration,
- final long timestamp,
- final byte priority)
+ final boolean durable,
+ final long expiration,
+ final long timestamp,
+ final byte priority)
{
return new ClientMessageImpl(type, durable, expiration, timestamp, p=
riority, initialMessagePacketSize);
}
@@ -712,6 +713,30 @@
}
}
=
+ public void individualAcknowledge(final long consumerID, final long mes=
sageID) throws HornetQException
+ {
+ // if we're pre-acknowledging then we don't need to do anything
+ if (preAcknowledge)
+ {
+ return;
+ }
+
+ checkClosed();
+
+ SessionIndividualAcknowledgeMessage message =3D new SessionIndividua=
lAcknowledgeMessage(consumerID,
+ =
messageID,
+ =
blockOnAcknowledge);
+
+ if (blockOnAcknowledge)
+ {
+ channel.sendBlocking(message);
+ }
+ else
+ {
+ channel.sendBatched(message);
+ }
+ }
+
public void expire(final long consumerID, final long messageID) throws =
HornetQException
{
checkClosed();
@@ -719,7 +744,7 @@
// We don't send expiries for pre-ack since message will already hav=
e been acked on server
if (!preAcknowledge)
{
- SessionExpiredMessage message =3D new SessionExpiredMessage(consu=
merID, messageID);
+ SessionExpireMessage message =3D new SessionExpireMessage(consume=
rID, messageID);
=
channel.send(message);
}
@@ -851,9 +876,9 @@
backupConnection.syncIDGeneratorSequence(remotingConnection.getID=
GeneratorSequence());
=
remotingConnection =3D backupConnection;
- =
+
int lcid =3D channel.getLastConfirmedCommandID();
- =
+
Packet request =3D new ReattachSessionMessage(name, lcid);
=
Channel channel1 =3D backupConnection.getChannel(1, -1);
@@ -864,11 +889,11 @@
{
// The session was found on the server - we reattached transpa=
rently ok
=
- channel.replayCommands(response.getLastConfirmedCommandID()); =
=
+ channel.replayCommands(response.getLastConfirmedCommandID());
}
else
{
- =
+
// The session wasn't found on the server - probably we're fai=
ling over onto a backup server where the
// session won't exist or the target server has been restarted=
- in this case the session will need to be
// recreated,
@@ -892,7 +917,8 @@
autoCommitA=
cks,
preAcknowle=
dge,
confirmatio=
nWindowSize,
- defaultAddr=
ess =3D=3D null ? null : defaultAddress.toString());
+ defaultAddr=
ess =3D=3D null ? null
+ =
: defaultAddress.toString());
boolean retry =3D false;
do
{
@@ -924,7 +950,7 @@
for (Map.Entry entry : consum=
ers.entrySet())
{
SessionQueueQueryResponseMessage queueInfo =3D entry.get=
Value().getQueueInfo();
- =
+
// We try and recreate any non durable queues, since the=
y probably won't be there unless
// they are defined in hornetq-configuration.xml
// This allows e.g. JMS non durable subs and temporary q=
ueues to continue to be used after failover
@@ -950,7 +976,7 @@
=
false);
=
sendPacketWithoutLock(createConsumerRequest);
- =
+
int clientWindowSize =3D entry.getValue().getClientWindo=
wSize();
=
if (clientWindowSize !=3D 0)
@@ -995,7 +1021,7 @@
channel.returnBlocking();
}
=
- channel.setTransferring(false); =
+ channel.setTransferring(false);
}
catch (Throwable t)
{
@@ -1014,15 +1040,15 @@
// not having any credits to send
}
}
- =
+
private volatile SimpleString defaultAddress;
- =
+
public void setAddress(final Message message, final SimpleString addres=
s)
{
if (defaultAddress =3D=3D null)
{
defaultAddress =3D address;
- =
+
message.setAddress(address);
}
else
@@ -1037,9 +1063,7 @@
}
}
}
- =
- =
- =
+
public void setPacketSize(final int packetSize)
{
if (packetSize > this.initialMessagePacketSize)
@@ -1083,7 +1107,7 @@
{
return producerCreditManager.getCredits(address, anon);
}
- =
+
public void returnCredits(final SimpleString address)
{
producerCreditManager.returnCredits(address);
@@ -1093,7 +1117,7 @@
{
producerCreditManager.receiveCredits(address, credits);
}
- =
+
public ClientProducerCreditManager getProducerCreditManager()
{
return producerCreditManager;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal=
.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java =
2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java =
2010-04-07 12:12:01 UTC (rev 9066)
@@ -33,6 +33,8 @@
String getName();
=
void acknowledge(long consumerID, long messageID) throws HornetQExcepti=
on;
+ =
+ void individualAcknowledge(long consumerID, long messageID) throws Horn=
etQException;
=
boolean isCacheLargeMessageClient();
=
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010=
-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010=
-04-07 12:12:01 UTC (rev 9066)
@@ -103,6 +103,11 @@
{
session.acknowledge(consumerID, messageID);
}
+ =
+ public void individualAcknowledge(final long consumerID, final long mes=
sageID) throws HornetQException
+ {
+ session.individualAcknowledge(consumerID, messageID);
+ }
=
public void addConsumer(final ClientConsumerInternal consumer)
{
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacket=
Handler.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle=
r.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandle=
r.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -24,6 +24,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKE=
N;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CO=
NSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDU=
AL_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUE=
RY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_SEND;
@@ -67,8 +68,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowC=
reditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsume=
rMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMe=
ssage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessag=
e;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumer=
Delivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAck=
nowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMes=
sage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryRes=
ponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProduc=
erCreditsMessage;
@@ -281,7 +283,7 @@
}
case SESS_EXPIRED:
{
- SessionExpiredMessage message =3D (SessionExpiredMessage=
)packet;
+ SessionExpireMessage message =3D (SessionExpireMessage)p=
acket;
session.expire(message.getConsumerID(), message.getMessa=
geID());
break;
}
@@ -414,6 +416,17 @@
closeChannel =3D true;
break;
}
+ case SESS_INDIVIDUAL_ACKNOWLEDGE:
+ {
+ SessionIndividualAcknowledgeMessage message =3D (Session=
IndividualAcknowledgeMessage)packet;
+ requiresResponse =3D message.isRequiresResponse();
+ session.individualAcknowledge(message.getConsumerID(), m=
essage.getMessageID());
+ if (requiresResponse)
+ {
+ response =3D new NullResponseMessage();
+ }
+ break;
+ }
case SESS_CONSUMER_CLOSE:
{
requiresResponse =3D true;
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.ja=
va
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 201=
0-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 201=
0-04-07 12:12:01 UTC (rev 9066)
@@ -188,13 +188,15 @@
{
resendCache.add(packet);
}
-
- connection.getTransportConnection().write(buffer, flush, batch=
);
}
finally
{
lock.unlock();
}
+ =
+ //The actual send must be outside the lock, or with OIO transport=
, the write can block if the tcp
+ //buffer is full, preventing any incoming buffers being handled a=
nd blocking failover
+ connection.getTransportConnection().write(buffer, flush, batch);
}
}
=
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.=
java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2=
010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2=
010-04-07 12:12:01 UTC (rev 9066)
@@ -48,6 +48,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKE=
N;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_FORCE_CO=
NSUMER_DELIVERY;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_INDIVIDU=
AL_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER=
_CREDITS;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_PRODUCER=
_REQUEST_CREDITS;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUE=
RY;
@@ -114,8 +115,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowC=
reditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsume=
rMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMe=
ssage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessag=
e;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumer=
Delivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAck=
nowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCredi=
tsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMes=
sage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryRes=
ponseMessage;
@@ -216,7 +218,7 @@
}
case SESS_EXPIRED:
{
- packet =3D new SessionExpiredMessage();
+ packet =3D new SessionExpireMessage();
break;
}
case SESS_COMMIT:
@@ -379,6 +381,11 @@
packet =3D new SessionConsumerCloseMessage();
break;
}
+ case SESS_INDIVIDUAL_ACKNOWLEDGE:
+ {
+ packet =3D new SessionIndividualAcknowledgeMessage();
+ break;
+ }
case NULL_RESPONSE:
{
packet =3D new NullResponseMessage();
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010=
-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010=
-04-07 12:12:01 UTC (rev 9066)
@@ -149,6 +149,8 @@
public static final byte SESS_PRODUCER_REQUEST_CREDITS =3D 79;
=
public static final byte SESS_PRODUCER_CREDITS =3D 80;
+ =
+ public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE =3D 81;
=
// Replication
=
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Ses=
sionAcknowledgeMessage.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAc=
knowledgeMessage.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAc=
knowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -29,7 +29,7 @@
private long consumerID;
=
private long messageID;
-
+ =
private boolean requiresResponse;
=
// Static --------------------------------------------------------
Copied: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Sessi=
onExpireMessage.java (from rev 9052, trunk/src/main/org/hornetq/core/protoc=
ol/core/impl/wireformat/SessionExpiredMessage.java)
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx=
pireMessage.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx=
pireMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author Tim Fox
+ * @version $Revision$
+ */
+public class SessionExpireMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+
+ private long messageID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionExpireMessage(final long consumerID, final long messageID)
+ {
+ super(PacketImpl.SESS_EXPIRED);
+
+ this.consumerID =3D consumerID;
+
+ this.messageID =3D messageID;
+ }
+
+ public SessionExpireMessage()
+ {
+ super(PacketImpl.SESS_EXPIRED);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+
+ buffer.writeLong(messageID);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ consumerID =3D buffer.readLong();
+
+ messageID =3D buffer.readLong();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof SessionExpireMessage =3D=3D false)
+ {
+ return false;
+ }
+
+ SessionExpireMessage r =3D (SessionExpireMessage)other;
+
+ return super.equals(other) && consumerID =3D=3D r.consumerID && mess=
ageID =3D=3D r.messageID;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Sess=
ionExpiredMessage.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx=
piredMessage.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionEx=
piredMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -1,98 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author Tim Fox
- * @version $Revision$
- */
-public class SessionExpiredMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long consumerID;
-
- private long messageID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionExpiredMessage(final long consumerID, final long messageI=
D)
- {
- super(PacketImpl.SESS_EXPIRED);
-
- this.consumerID =3D consumerID;
-
- this.messageID =3D messageID;
- }
-
- public SessionExpiredMessage()
- {
- super(PacketImpl.SESS_EXPIRED);
- }
-
- // Public --------------------------------------------------------
-
- public long getConsumerID()
- {
- return consumerID;
- }
-
- public long getMessageID()
- {
- return messageID;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeLong(consumerID);
-
- buffer.writeLong(messageID);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- consumerID =3D buffer.readLong();
-
- messageID =3D buffer.readLong();
- }
-
- @Override
- public boolean equals(final Object other)
- {
- if (other instanceof SessionExpiredMessage =3D=3D false)
- {
- return false;
- }
-
- SessionExpiredMessage r =3D (SessionExpiredMessage)other;
-
- return super.equals(other) && consumerID =3D=3D r.consumerID && mess=
ageID =3D=3D r.messageID;
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/Sessio=
nIndividualAcknowledgeMessage.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIn=
dividualAcknowledgeMessage.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionIn=
dividualAcknowledgeMessage.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author Tim Fox
+ * @version $Revision$
+ */
+public class SessionIndividualAcknowledgeMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+
+ private long messageID;
+ =
+ private boolean requiresResponse;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionIndividualAcknowledgeMessage(final long consumerID, final=
long messageID, final boolean requiresResponse)
+ {
+ super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+
+ this.consumerID =3D consumerID;
+
+ this.messageID =3D messageID;
+
+ this.requiresResponse =3D requiresResponse;
+ }
+
+ public SessionIndividualAcknowledgeMessage()
+ {
+ super(PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(consumerID);
+
+ buffer.writeLong(messageID);
+
+ buffer.writeBoolean(requiresResponse);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ consumerID =3D buffer.readLong();
+
+ messageID =3D buffer.readLong();
+
+ requiresResponse =3D buffer.readBoolean();
+ }
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof SessionIndividualAcknowledgeMessage =3D=3D fals=
e)
+ {
+ return false;
+ }
+
+ SessionIndividualAcknowledgeMessage r =3D (SessionIndividualAcknowle=
dgeMessage)other;
+
+ return super.equals(other) && consumerID =3D=3D r.consumerID &&
+ messageID =3D=3D r.messageID &&
+ requiresResponse =3D=3D r.requiresResponse;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 0=
8:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-04-07 1=
2:12:01 UTC (rev 9066)
@@ -38,9 +38,11 @@
=
Queue getQueue();
=
- MessageReference getExpired(long messageID) throws Exception;
+ MessageReference removeReferenceByID(long messageID) throws Exception;
=
void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID=
) throws Exception;
+ =
+ void individualAcknowledge(boolean autoCommitAcks, Transaction tx, long=
messageID) throws Exception;
=
void forceDelivery(long sequence); =
=
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 08=
:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-04-07 12=
:12:01 UTC (rev 9066)
@@ -18,7 +18,6 @@
import javax.transaction.xa.Xid;
=
import org.hornetq.api.core.SimpleString;
-import org.hornetq.spi.core.protocol.SessionCallback;
=
/**
*
@@ -43,6 +42,8 @@
void removeConsumer(long consumerID) throws Exception;
=
void acknowledge(long consumerID, long messageID) throws Exception;
+ =
+ void individualAcknowledge(long consumerID, long messageID) throws Exce=
ption;
=
void expire(long consumerID, long messageID) throws Exception;
=
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.ja=
va
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 201=
0-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 201=
0-04-07 12:12:01 UTC (rev 9066)
@@ -517,8 +517,32 @@
}
while (ref.getMessage().getMessageID() !=3D messageID);
}
+ =
+ public void individualAcknowledge(final boolean autoCommitAcks, final T=
ransaction tx, final long messageID) throws Exception
+ {
+ if (browseOnly)
+ {
+ return;
+ }
+ =
+ MessageReference ref =3D removeReferenceByID(messageID);
+ =
+ if (ref =3D=3D null)
+ {
+ throw new IllegalStateException("Cannot find ref to ack " + messa=
geID);
+ }
+ =
+ if (autoCommitAcks)
+ {
+ ref.getQueue().acknowledge(ref);
+ }
+ else
+ {
+ ref.getQueue().acknowledge(tx, ref);
+ }
+ }
=
- public MessageReference getExpired(final long messageID) throws Excepti=
on
+ public MessageReference removeReferenceByID(final long messageID) throw=
s Exception
{
if (browseOnly)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010=
-04-07 08:35:35 UTC (rev 9065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010=
-04-07 12:12:01 UTC (rev 9066)
@@ -473,10 +473,22 @@
=
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
+ =
+ public void individualAcknowledge(final long consumerID, final long mes=
sageID) throws Exception
+ {
+ ServerConsumer consumer =3D consumers.get(consumerID);
+ =
+ if (this.xa && tx =3D=3D null)
+ {
+ throw new HornetQXAException(XAException.XAER_PROTO, "Invalid tra=
nsaction state");
+ }
=
+ consumer.individualAcknowledge(autoCommitAcks, tx, messageID);
+ }
+
public void expire(final long consumerID, final long messageID) throws =
Exception
{
- MessageReference ref =3D consumers.get(consumerID).getExpired(messag=
eID);
+ MessageReference ref =3D consumers.get(consumerID).removeReferenceBy=
ID(messageID);
=
if (ref !=3D null)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessagePrior=
ityTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTes=
t.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessagePriorityTes=
t.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -201,7 +201,68 @@
session.deleteQueue(queue);
=
}
+ =
+ // https://jira.jboss.org/jira/browse/HORNETQ-275
+ public void testOutOfOrderAcknowledgement() throws Exception
+ {
+ SimpleString queue =3D RandomUtil.randomSimpleString();
+ SimpleString address =3D RandomUtil.randomSimpleString();
=
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer =3D session.createProducer(address);
+
+ ClientConsumer consumer =3D session.createConsumer(queue);
+
+ session.start();
+
+ for (int i =3D 0; i < 10; i++)
+ {
+ ClientMessage m =3D createTextMessage(Integer.toString(i), sessio=
n);
+ m.setPriority((byte)i);
+ producer.send(m);
+
+ Thread.sleep(20);
+ }
+
+ // Now we wait a little bit to make sure the messages are in the cli=
ent side buffer
+
+ // They should have been added to the delivering list in the ServerC=
onsumerImpl in the order
+ // they were sent, not priority order
+
+ //We receive one of the messages
+ ClientMessage m =3D consumer.receive(500);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(9, m.getPriority());
+
+ //Ack it
+ m.acknowledge();
+
+ consumer.close();
+ =
+ //Close and try and receive the other ones
+
+ consumer =3D session.createConsumer(queue);
+
+ // Other messages should be received now
+ // Previously there was a bug whereby if deliveries were stored on s=
erver side in send order
+ // then if received in priority order, and acked
+ // the ack would ack all messages up to the one received - resulting=
in acking
+ // messages that hadn't been delivered yet
+ for (int i =3D 8; i >=3D 0; i--)
+ {
+ m =3D consumer.receive(500);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(i, m.getPriority());
+
+ m.acknowledge();
+ }
+ =
+ consumer.close();
+
+ session.deleteQueue(queue);
+ }
+
// Package protected ---------------------------------------------
=
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/Ne=
ttyMultiThreadRandomReattachTest.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMul=
tiThreadRandomReattachTest.java 2010-04-07 08:35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMul=
tiThreadRandomReattachTest.java 2010-04-07 12:12:01 UTC (rev 9066)
@@ -51,6 +51,7 @@
final ClientSessionFactoryInternal sf =3D (ClientSessionFactoryInter=
nal) HornetQClient.createClientSessionFactory(new TransportConfiguration("o=
rg.hornetq.integration.transports.netty.NettyConnectorFactory"));
sf.setReconnectAttempts(-1);
sf.setConfirmationWindowSize(1024 * 1024);
+ sf.setAckBatchSize(0);
return sf;
}
=
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 08:=
35:35 UTC (rev 9065)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-04-07 12:=
12:01 UTC (rev 9066)
@@ -881,10 +881,10 @@
protected ClientMessage createTextMessage(final String s, final boolean=
durable, final ClientSession clientSession)
{
ClientMessage message =3D clientSession.createMessage(HornetQTextMes=
sage.TYPE,
- durable,
- 0,
- System.cur=
rentTimeMillis(),
- (byte)1);
+ durable,
+ 0,
+ System.currentTi=
meMillis(),
+ (byte)4);
message.getBodyBuffer().writeString(s);
return message;
}
--===============5722681955605955828==--