Author: timfox
Date: 2009-11-06 10:59:15 -0500 (Fri, 06 Nov 2009)
New Revision: 8242
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/management/Notification.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/MessageReference.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
Log:
various tweaks, changes to prod flow control, thread names etc
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2009-11-06
15:32:46 UTC (rev 8241)
+++
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -26,7 +26,7 @@
{
ClientProducerCredits getCredits(SimpleString destination);
- void receiveCredits(SimpleString destination, int credits);
+ void receiveCredits(SimpleString destination, int credits, int offset);
void reset();
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2009-11-06
15:32:46 UTC (rev 8241)
+++
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -55,13 +55,13 @@
return credits;
}
- public synchronized void receiveCredits(final SimpleString destination, final int
credits)
+ public synchronized void receiveCredits(final SimpleString destination, final int
credits, final int offset)
{
ClientProducerCredits cr = producerCredits.get(destination);
if (cr != null)
{
- cr.receiveCredits(credits);
+ cr.receiveCredits(credits, offset);
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -22,9 +22,9 @@
*/
public interface ClientProducerCredits
{
- void acquireCredits(final int credits) throws InterruptedException;
+ void acquireCredits(int credits) throws InterruptedException;
- void receiveCredits(final int credits);
+ void receiveCredits(int credits, int offset);
void reset();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -38,6 +38,8 @@
private final ClientSessionInternal session;
private int arriving;
+
+ private int offset;
public ClientProducerCreditsImpl(final ClientSessionInternal session,
final SimpleString destination,
@@ -58,16 +60,20 @@
checkCredits(windowSize);
}
- public void acquireCredits(final int credits) throws InterruptedException
+ public void acquireCredits(int credits) throws InterruptedException
{
+ // credits += offset;
+
checkCredits(credits);
semaphore.acquire(credits);
}
- public synchronized void receiveCredits(final int credits)
+ public synchronized void receiveCredits(final int credits, final int offset)
{
arriving -= credits;
+
+ this.offset = offset;
semaphore.release(credits);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-06 15:32:46
UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-06 15:59:15
UTC (rev 8242)
@@ -977,9 +977,9 @@
return producerCreditManager.getCredits(address);
}
- public void handleReceiveProducerCredits(final SimpleString address, final int
credits)
+ public void handleReceiveProducerCredits(final SimpleString address, final int
credits, final int offset)
{
- producerCreditManager.receiveCredits(address, credits);
+ producerCreditManager.receiveCredits(address, credits, offset);
}
// CommandConfirmationHandler implementation ------------------------------------
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -71,5 +71,5 @@
ClientProducerCredits getCredits(SimpleString address);
- void handleReceiveProducerCredits(SimpleString address, int credits);
+ void handleReceiveProducerCredits(SimpleString address, int credits, int offset);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -84,7 +84,8 @@
{
SessionProducerCreditsMessage message =
(SessionProducerCreditsMessage)packet;
- clientSession.handleReceiveProducerCredits(message.getAddress(),
message.getCredits());
+ clientSession.handleReceiveProducerCredits(message.getAddress(),
message.getCredits(),
+ message.getOffset());
break;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-11-06 15:32:46
UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-11-06 15:59:15
UTC (rev 8242)
@@ -539,8 +539,8 @@
return session.getCredits(address);
}
- public void handleReceiveProducerCredits(SimpleString address, int credits)
+ public void handleReceiveProducerCredits(SimpleString address, int credits, int
offset)
{
- session.handleReceiveProducerCredits(address, credits);
+ session.handleReceiveProducerCredits(address, credits, offset);
}
}
Modified: trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -116,7 +116,7 @@
started = true;
- thread = new Thread(this);
+ thread = new Thread(this, "hornetq-discovery-group-thread-" + name);
thread.setDaemon(true);
@@ -126,7 +126,7 @@
{
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), new
SimpleString(name));
- Notification notification = new Notification(nodeID,
NotificationType.DISCOVERY_GROUP_STARTED, props );
+ Notification notification = new Notification(nodeID,
NotificationType.DISCOVERY_GROUP_STARTED, props);
notificationService.sendNotification(notification );
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-06 15:32:46 UTC
(rev 8241)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-06 15:59:15 UTC
(rev 8242)
@@ -3506,6 +3506,8 @@
private PerfBlast(final int pages)
{
+ super("hornetq-perfblast-thread");
+
this.pages = pages;
}
Modified: trunk/src/main/org/hornetq/core/management/Notification.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/Notification.java 2009-11-06 15:32:46 UTC
(rev 8241)
+++ trunk/src/main/org/hornetq/core/management/Notification.java 2009-11-06 15:59:15 UTC
(rev 8242)
@@ -29,8 +29,10 @@
private final NotificationType type;
private final TypedProperties properties;
+
+ private final String uid;
- public Notification(String uid, final NotificationType type, final TypedProperties
properties)
+ public Notification(final String uid, final NotificationType type, final
TypedProperties properties)
{
this.uid = uid;
this.type = type;
@@ -46,14 +48,12 @@
{
return properties;
}
-
- private String uid;
-
+
public String getUID()
{
return uid;
}
-
+
@Override
public String toString()
{
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-06 15:32:46 UTC
(rev 8241)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-06 15:59:15 UTC
(rev 8242)
@@ -132,15 +132,15 @@
protected MessageImpl(final MessageImpl other)
{
this();
- this.messageID = other.messageID;
- this.destination = other.destination;
- this.type = other.type;
- this.durable = other.durable;
- this.expiration = other.expiration;
- this.timestamp = other.timestamp;
- this.priority = other.priority;
- this.properties = new TypedProperties(other.properties);
- this.body = other.body;
+ messageID = other.messageID;
+ destination = other.destination;
+ type = other.type;
+ durable = other.durable;
+ expiration = other.expiration;
+ timestamp = other.timestamp;
+ priority = other.priority;
+ properties = new TypedProperties(other.properties);
+ body = other.body;
}
/*
@@ -149,15 +149,15 @@
protected MessageImpl(final Message other)
{
this();
- this.messageID = other.getMessageID();
- this.destination = other.getDestination();
- this.type = other.getType();
- this.durable = other.isDurable();
- this.expiration = other.getExpiration();
- this.timestamp = other.getTimestamp();
- this.priority = other.getPriority();
- this.properties = new TypedProperties(other.getProperties());
- this.body = other.getBody();
+ messageID = other.getMessageID();
+ destination = other.getDestination();
+ type = other.getType();
+ durable = other.isDurable();
+ expiration = other.getExpiration();
+ timestamp = other.getTimestamp();
+ priority = other.getPriority();
+ properties = new TypedProperties(other.getProperties());
+ body = other.getBody();
}
protected MessageImpl(final long messageID)
@@ -196,7 +196,7 @@
return body.writerIndex();
}
- public void encodeHeadersAndProperties(HornetQBuffer buffer)
+ public void encodeHeadersAndProperties(final HornetQBuffer buffer)
{
buffer.writeLong(messageID);
buffer.writeSimpleString(destination);
@@ -208,14 +208,14 @@
properties.encode(buffer);
}
- public void encodeBody(HornetQBuffer buffer)
+ public void encodeBody(final HornetQBuffer buffer)
{
HornetQBuffer localBody = getBody();
buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
}
// Used on Message chunk side
- public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
+ public void encodeBody(final HornetQBuffer bufferOut, final
LargeMessageEncodingContext context, final int size)
{
context.write(bufferOut, size);
}
@@ -330,7 +330,7 @@
/**
* @param bodyInputStream the bodyInputStream to set
*/
- public void setBodyInputStream(InputStream bodyInputStream)
+ public void setBodyInputStream(final InputStream bodyInputStream)
{
this.bodyInputStream = bodyInputStream;
}
@@ -400,8 +400,8 @@
{
properties.putSimpleStringProperty(key, value);
}
-
- public void putObjectProperty(SimpleString key, Object value) throws
PropertyConversionException
+
+ public void putObjectProperty(final SimpleString key, final Object value) throws
PropertyConversionException
{
if (value == null)
{
@@ -447,7 +447,7 @@
}
}
- public void putObjectProperty(String key, Object value) throws
PropertyConversionException
+ public void putObjectProperty(final String key, final Object value) throws
PropertyConversionException
{
putObjectProperty(new SimpleString(key), value);
}
@@ -497,7 +497,7 @@
properties.putSimpleStringProperty(new SimpleString(key), new
SimpleString(value));
}
- public void putTypedProperties(TypedProperties otherProps)
+ public void putTypedProperties(final TypedProperties otherProps)
{
properties.putTypedProperties(otherProps);
}
@@ -507,87 +507,87 @@
return properties.getProperty(key);
}
- public Boolean getBooleanProperty(SimpleString key) throws
PropertyConversionException
+ public Boolean getBooleanProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getBooleanProperty(key);
}
-
- public Boolean getBooleanProperty(String key) throws PropertyConversionException
+
+ public Boolean getBooleanProperty(final String key) throws
PropertyConversionException
{
return properties.getBooleanProperty(new SimpleString(key));
}
- public Byte getByteProperty(SimpleString key) throws PropertyConversionException
+ public Byte getByteProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getByteProperty(key);
}
-
- public Byte getByteProperty(String key) throws PropertyConversionException
+
+ public Byte getByteProperty(final String key) throws PropertyConversionException
{
return properties.getByteProperty(new SimpleString(key));
}
- public byte[] getBytesProperty(SimpleString key) throws PropertyConversionException
+ public byte[] getBytesProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getBytesProperty(key);
}
-
- public byte[] getBytesProperty(String key) throws PropertyConversionException
+
+ public byte[] getBytesProperty(final String key) throws PropertyConversionException
{
return getBytesProperty(new SimpleString(key));
}
- public Double getDoubleProperty(SimpleString key) throws PropertyConversionException
+ public Double getDoubleProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getDoubleProperty(key);
}
- public Double getDoubleProperty(String key) throws PropertyConversionException
+ public Double getDoubleProperty(final String key) throws PropertyConversionException
{
return properties.getDoubleProperty(new SimpleString(key));
}
- public Integer getIntProperty(SimpleString key) throws PropertyConversionException
+ public Integer getIntProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getIntProperty(key);
}
- public Integer getIntProperty(String key) throws PropertyConversionException
+ public Integer getIntProperty(final String key) throws PropertyConversionException
{
return properties.getIntProperty(new SimpleString(key));
}
- public Long getLongProperty(SimpleString key) throws PropertyConversionException
+ public Long getLongProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getLongProperty(key);
}
-
- public Long getLongProperty(String key) throws PropertyConversionException
+
+ public Long getLongProperty(final String key) throws PropertyConversionException
{
return properties.getLongProperty(new SimpleString(key));
}
- public Short getShortProperty(SimpleString key) throws PropertyConversionException
+ public Short getShortProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getShortProperty(key);
}
-
- public Short getShortProperty(String key) throws PropertyConversionException
+
+ public Short getShortProperty(final String key) throws PropertyConversionException
{
return properties.getShortProperty(new SimpleString(key));
}
- public Float getFloatProperty(SimpleString key) throws PropertyConversionException
+ public Float getFloatProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getFloatProperty(key);
}
- public Float getFloatProperty(String key) throws PropertyConversionException
+ public Float getFloatProperty(final String key) throws PropertyConversionException
{
return properties.getFloatProperty(new SimpleString(key));
}
-
- public String getStringProperty(SimpleString key) throws PropertyConversionException
+
+ public String getStringProperty(final SimpleString key) throws
PropertyConversionException
{
SimpleString str = getSimpleStringProperty(key);
@@ -600,18 +600,18 @@
return str.toString();
}
}
-
- public String getStringProperty(String key) throws PropertyConversionException
+
+ public String getStringProperty(final String key) throws PropertyConversionException
{
return getStringProperty(new SimpleString(key));
}
-
- public SimpleString getSimpleStringProperty(SimpleString key) throws
PropertyConversionException
+
+ public SimpleString getSimpleStringProperty(final SimpleString key) throws
PropertyConversionException
{
return properties.getSimpleStringProperty(key);
}
-
- public SimpleString getSimpleStringProperty(String key) throws
PropertyConversionException
+
+ public SimpleString getSimpleStringProperty(final String key) throws
PropertyConversionException
{
return properties.getSimpleStringProperty(new SimpleString(key));
}
@@ -620,7 +620,7 @@
{
return properties.getProperty(new SimpleString(key));
}
-
+
public Object removeProperty(final SimpleString key)
{
return properties.removeProperty(key);
@@ -648,7 +648,7 @@
public TypedProperties getProperties()
{
- return this.properties;
+ return properties;
}
// Body
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-06 15:32:46
UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-06 15:59:15
UTC (rev 8242)
@@ -40,6 +40,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.MessageReferenceImpl;
import org.hornetq.core.server.impl.ServerProducerCreditManager;
import org.hornetq.core.server.impl.ServerProducerCreditManagerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -291,7 +292,7 @@
public void addSize(final MessageReference reference, final boolean add) throws
Exception
{
- long size = reference.getMemoryEstimate();
+ long size = MessageReferenceImpl.getMemoryEstimate();
if (add)
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -959,7 +959,7 @@
{
if (reaperPeriod > 0)
{
- reaperThread = new Thread(reaperRunnable, "HornetQ-expiry-reaper");
+ reaperThread = new Thread(reaperRunnable,
"hornetq-expiry-reaper-thread");
reaperThread.setPriority(reaperPriority);
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java 2009-11-06
15:32:46 UTC (rev 8241)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -28,20 +28,24 @@
// Attributes ----------------------------------------------------
private int credits;
-
+
private SimpleString address;
+ private int offset;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionProducerCreditsMessage(final int credits, final SimpleString address)
+ public SessionProducerCreditsMessage(final int credits, final SimpleString address,
final int offset)
{
super(SESS_PRODUCER_CREDITS);
this.credits = credits;
-
+
this.address = address;
+
+ this.offset = offset;
}
public SessionProducerCreditsMessage()
@@ -55,22 +59,23 @@
{
return credits;
}
-
+
public SimpleString getAddress()
{
return address;
}
-
-// public boolean isRequiresConfirmations()
-// {
-// return false;
-// }
+ public int getOffset()
+ {
+ return offset;
+ }
+
@Override
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeInt(credits);
buffer.writeSimpleString(address);
+ buffer.writeInt(offset);
}
@Override
@@ -78,11 +83,14 @@
{
credits = buffer.readInt();
address = buffer.readSimpleString();
+ offset = buffer.readInt();
}
public int getRequiredBufferSize()
{
- int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
SimpleString.sizeofString(address);
+ int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+ SimpleString.sizeofString(address) +
+ DataConstants.SIZE_INT;
return size;
}
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-06
15:32:46 UTC (rev 8241)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -444,6 +444,8 @@
FailureCheckThread(final long pauseInterval)
{
+ super("hornetq-failure-check-thread");
+
this.pauseInterval = pauseInterval;
}
Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-11-06 15:32:46 UTC
(rev 8241)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-11-06 15:59:15 UTC
(rev 8242)
@@ -40,8 +40,6 @@
void setScheduledDeliveryTime(long scheduledDeliveryTime);
- int getMemoryEstimate();
-
int getDeliveryCount();
void setDeliveryCount(int deliveryCount);
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-06 15:32:46
UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-06 15:59:15
UTC (rev 8242)
@@ -185,11 +185,6 @@
return ref.getDeliveryCount();
}
- public int getMemoryEstimate()
- {
- return ref.getMemoryEstimate();
- }
-
public ServerMessage getMessage()
{
return ref.getMessage();
Modified: trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-11-06 15:32:46
UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-11-06 15:59:15
UTC (rev 8242)
@@ -74,7 +74,7 @@
started = true;
- thread = new Thread(new MemoryRunnable());
+ thread = new Thread(new MemoryRunnable(),
"hornetq-memory-manager-thread");
thread.setDaemon(true);
Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -45,6 +45,13 @@
// Constructors --------------------------------------------------
+ public MessageReferenceImpl()
+ {
+ queue = null;
+
+ message = null;
+ }
+
public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue)
{
deliveryCount = other.deliveryCount;
@@ -69,7 +76,7 @@
return new MessageReferenceImpl(this, queue);
}
- public int getMemoryEstimate()
+ public static int getMemoryEstimate()
{
// from few tests I have done, deliveryCount and scheduledDelivery will use two
longs (because of alignment)
// and each of the references (messages and queue) will use the equivalent to two
longs (because of long
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-06 15:32:46
UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-06 15:59:15
UTC (rev 8242)
@@ -28,6 +28,7 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
@@ -112,7 +113,31 @@
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
// Static
-------------------------------------------------------------------------------
+
+ private static int offset;
+ static
+ {
+ try
+ {
+ ServerMessage msg = new ServerMessageImpl();
+
+ msg.setBody(ChannelBuffers.dynamicBuffer(0));
+
+ msg.setDestination(new SimpleString("foobar"));
+
+ int es = msg.getEncodeSize();
+
+ int me = msg.getMemoryEstimate();
+
+ offset = MessageReferenceImpl.getMemoryEstimate() + me - es;
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to initialise mult and offset", e);
+ }
+ }
+
// Attributes
----------------------------------------------------------------------------
private final long id;
@@ -1895,7 +1920,7 @@
return holder;
}
-
+
private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws
Exception
{
CreditManagerHolder holder = creditManagerHolders.get(address);
@@ -1903,7 +1928,7 @@
if (holder == null)
{
PagingStore store = postOffice.getPagingManager().getPageStore(address);
-
+
holder = new CreditManagerHolder(store);
creditManagerHolders.put(address, holder);
@@ -1913,14 +1938,14 @@
}
private void sendProducerCredits(final CreditManagerHolder holder, final int credits,
final SimpleString address)
- {
+ {
holder.outstandingCredits += credits;
- Packet packet = new SessionProducerCreditsMessage(credits, address);
+ Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
channel.send(packet);
}
-
+
private void send(final ServerMessage msg) throws Exception
{
// Look up the paging store
Modified: trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
===================================================================
---
trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java 2009-11-06
15:32:46 UTC (rev 8241)
+++
trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -270,6 +270,11 @@
protected class Shutdown extends Thread
{
+ public Shutdown()
+ {
+ super("hornetq-shutdown-thread");
+ }
+
public void run()
{
HornetQBootstrapServer.this.shutDown();
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-06
15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -512,14 +512,7 @@
{
if (connections.remove(connectionID) != null)
{
- // // Execute on different thread to avoid deadlocks
- // new Thread()
- // {
- // public void run()
- // {
listener.connectionDestroyed(connectionID);
- // }
- // }.start();
}
}
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-11-06 15:32:46 UTC
(rev 8241)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-11-06 15:59:15 UTC
(rev 8242)
@@ -318,7 +318,7 @@
timeChecker = new BatchTimeChecker();
- checkerThread = new Thread(timeChecker);
+ checkerThread = new Thread(timeChecker,
"jmsbridge-checker-thread");
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
@@ -1415,7 +1415,7 @@
//In the case of onMessage we can't close the connection from inside the
onMessage method
//since it will block waiting for onMessage to complete. In the case of start we
want to return
//from the call before the connections are reestablished so that the caller is not
blocked unnecessarily.
- Thread t = new Thread(failureHandler);
+ Thread t = new Thread(failureHandler,
"jmsbridge-failurehandler-thread");
t.start();
}
@@ -1504,6 +1504,11 @@
*/
private final class SourceReceiver extends Thread
{
+ SourceReceiver()
+ {
+ super("jmsbridge-source-receiver-thread");
+ }
+
@Override
public void run()
{
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-06
15:32:46 UTC (rev 8241)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-06
15:59:15 UTC (rev 8242)
@@ -12,22 +12,25 @@
*/
package org.hornetq.tests.integration.cluster.failover;
+import java.util.Map;
+
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQ;
-import java.util.Map;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
*/
public class GroupingFailoverReplicationTest extends GroupingFailoverTestBase
{
+ private static final Logger log =
Logger.getLogger(GroupingFailoverReplicationTest.class);
+
protected void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int
backupNode)
{
if (servers[node] != null)