[hornetq-commits] JBoss hornetq SVN: r8242 - in trunk: src/main/org/hornetq/core/cluster/impl and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 6 10:59:18 EST 2009


Author: timfox
Date: 2009-11-06 10:59:15 -0500 (Fri, 06 Nov 2009)
New Revision: 8242

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/management/Notification.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/MessageReference.java
   trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
Log:
various tweaks, changes to prod flow control, thread names etc

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManager.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -26,7 +26,7 @@
 {
    ClientProducerCredits getCredits(SimpleString destination);
    
-   void receiveCredits(SimpleString destination, int credits);
+   void receiveCredits(SimpleString destination, int credits, int offset);
    
    void reset();
    

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -55,13 +55,13 @@
       return credits;
    }
 
-   public synchronized void receiveCredits(final SimpleString destination, final int credits)
+   public synchronized void receiveCredits(final SimpleString destination, final int credits, final int offset)
    {
       ClientProducerCredits cr = producerCredits.get(destination);
 
       if (cr != null)
       {
-         cr.receiveCredits(credits);
+         cr.receiveCredits(credits, offset);
       }
    }
    

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -22,9 +22,9 @@
  */
 public interface ClientProducerCredits
 {
-   void acquireCredits(final int credits) throws InterruptedException;
+   void acquireCredits(int credits) throws InterruptedException;
 
-   void receiveCredits(final int credits);
+   void receiveCredits(int credits, int offset);
    
    void reset();
    

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -38,6 +38,8 @@
    private final ClientSessionInternal session;
 
    private int arriving;
+   
+   private int offset;
 
    public ClientProducerCreditsImpl(final ClientSessionInternal session,
                                     final SimpleString destination,
@@ -58,16 +60,20 @@
       checkCredits(windowSize);
    }
 
-   public void acquireCredits(final int credits) throws InterruptedException
+   public void acquireCredits(int credits) throws InterruptedException
    {
+     // credits += offset;
+      
       checkCredits(credits);
 
       semaphore.acquire(credits);
    }
 
-   public synchronized void receiveCredits(final int credits)
+   public synchronized void receiveCredits(final int credits, final int offset)
    {
       arriving -= credits;
+      
+      this.offset = offset;
 
       semaphore.release(credits);
    }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -977,9 +977,9 @@
       return producerCreditManager.getCredits(address);
    }
    
-   public void handleReceiveProducerCredits(final SimpleString address, final int credits)
+   public void handleReceiveProducerCredits(final SimpleString address, final int credits, final int offset)
    {
-      producerCreditManager.receiveCredits(address, credits);
+      producerCreditManager.receiveCredits(address, credits, offset);
    }
 
    // CommandConfirmationHandler implementation ------------------------------------

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -71,5 +71,5 @@
    
    ClientProducerCredits getCredits(SimpleString address);
    
-   void handleReceiveProducerCredits(SimpleString address, int credits);
+   void handleReceiveProducerCredits(SimpleString address, int credits, int offset);
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -84,7 +84,8 @@
             {
                SessionProducerCreditsMessage message = (SessionProducerCreditsMessage)packet;
                
-               clientSession.handleReceiveProducerCredits(message.getAddress(), message.getCredits());
+               clientSession.handleReceiveProducerCredits(message.getAddress(), message.getCredits(),
+                                                          message.getOffset());
                
                break;
             }

Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -539,8 +539,8 @@
       return session.getCredits(address);
    }
 
-   public void handleReceiveProducerCredits(SimpleString address, int credits)
+   public void handleReceiveProducerCredits(SimpleString address, int credits, int offset)
    {
-      session.handleReceiveProducerCredits(address, credits);
+      session.handleReceiveProducerCredits(address, credits, offset);
    }  
 }

Modified: trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -116,7 +116,7 @@
 
       started = true;
 
-      thread = new Thread(this);
+      thread = new Thread(this, "hornetq-discovery-group-thread-" + name);
 
       thread.setDaemon(true);
 
@@ -126,7 +126,7 @@
       {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
-         Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props );
+         Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props);
          notificationService.sendNotification(notification );
       }
    }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -3506,6 +3506,8 @@
 
       private PerfBlast(final int pages)
       {
+         super("hornetq-perfblast-thread");
+         
          this.pages = pages;
       }
 

Modified: trunk/src/main/org/hornetq/core/management/Notification.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/Notification.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/management/Notification.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -29,8 +29,10 @@
    private final NotificationType type;
 
    private final TypedProperties properties;
+   
+   private final String uid;
 
-   public Notification(String uid, final NotificationType type, final TypedProperties properties)
+   public Notification(final String uid, final NotificationType type, final TypedProperties properties)
    {
       this.uid = uid;
       this.type = type;
@@ -46,14 +48,12 @@
    {
       return properties;
    }
-
-   private String uid;
-
+   
    public String getUID()
    {
       return uid;
    }
-   
+
    @Override
    public String toString()
    {

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -132,15 +132,15 @@
    protected MessageImpl(final MessageImpl other)
    {
       this();
-      this.messageID = other.messageID;
-      this.destination = other.destination;
-      this.type = other.type;
-      this.durable = other.durable;
-      this.expiration = other.expiration;
-      this.timestamp = other.timestamp;
-      this.priority = other.priority;
-      this.properties = new TypedProperties(other.properties);
-      this.body = other.body;
+      messageID = other.messageID;
+      destination = other.destination;
+      type = other.type;
+      durable = other.durable;
+      expiration = other.expiration;
+      timestamp = other.timestamp;
+      priority = other.priority;
+      properties = new TypedProperties(other.properties);
+      body = other.body;
    }
 
    /*
@@ -149,15 +149,15 @@
    protected MessageImpl(final Message other)
    {
       this();
-      this.messageID = other.getMessageID();
-      this.destination = other.getDestination();
-      this.type = other.getType();
-      this.durable = other.isDurable();
-      this.expiration = other.getExpiration();
-      this.timestamp = other.getTimestamp();
-      this.priority = other.getPriority();
-      this.properties = new TypedProperties(other.getProperties());
-      this.body = other.getBody();
+      messageID = other.getMessageID();
+      destination = other.getDestination();
+      type = other.getType();
+      durable = other.isDurable();
+      expiration = other.getExpiration();
+      timestamp = other.getTimestamp();
+      priority = other.getPriority();
+      properties = new TypedProperties(other.getProperties());
+      body = other.getBody();
    }
 
    protected MessageImpl(final long messageID)
@@ -196,7 +196,7 @@
       return body.writerIndex();
    }
 
-   public void encodeHeadersAndProperties(HornetQBuffer buffer)
+   public void encodeHeadersAndProperties(final HornetQBuffer buffer)
    {
       buffer.writeLong(messageID);
       buffer.writeSimpleString(destination);
@@ -208,14 +208,14 @@
       properties.encode(buffer);
    }
 
-   public void encodeBody(HornetQBuffer buffer)
+   public void encodeBody(final HornetQBuffer buffer)
    {
       HornetQBuffer localBody = getBody();
       buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
    }
 
    // Used on Message chunk side
-   public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
+   public void encodeBody(final HornetQBuffer bufferOut, final LargeMessageEncodingContext context, final int size)
    {
       context.write(bufferOut, size);
    }
@@ -330,7 +330,7 @@
    /**
     * @param bodyInputStream the bodyInputStream to set
     */
-   public void setBodyInputStream(InputStream bodyInputStream)
+   public void setBodyInputStream(final InputStream bodyInputStream)
    {
       this.bodyInputStream = bodyInputStream;
    }
@@ -400,8 +400,8 @@
    {
       properties.putSimpleStringProperty(key, value);
    }
-   
-   public void putObjectProperty(SimpleString key, Object value) throws PropertyConversionException
+
+   public void putObjectProperty(final SimpleString key, final Object value) throws PropertyConversionException
    {
       if (value == null)
       {
@@ -447,7 +447,7 @@
       }
    }
 
-   public void putObjectProperty(String key, Object value) throws PropertyConversionException
+   public void putObjectProperty(final String key, final Object value) throws PropertyConversionException
    {
       putObjectProperty(new SimpleString(key), value);
    }
@@ -497,7 +497,7 @@
       properties.putSimpleStringProperty(new SimpleString(key), new SimpleString(value));
    }
 
-   public void putTypedProperties(TypedProperties otherProps)
+   public void putTypedProperties(final TypedProperties otherProps)
    {
       properties.putTypedProperties(otherProps);
    }
@@ -507,87 +507,87 @@
       return properties.getProperty(key);
    }
 
-   public Boolean getBooleanProperty(SimpleString key) throws PropertyConversionException
+   public Boolean getBooleanProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getBooleanProperty(key);
    }
-   
-   public Boolean getBooleanProperty(String key) throws PropertyConversionException
+
+   public Boolean getBooleanProperty(final String key) throws PropertyConversionException
    {
       return properties.getBooleanProperty(new SimpleString(key));
    }
 
-   public Byte getByteProperty(SimpleString key) throws PropertyConversionException
+   public Byte getByteProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getByteProperty(key);
    }
-   
-   public Byte getByteProperty(String key) throws PropertyConversionException
+
+   public Byte getByteProperty(final String key) throws PropertyConversionException
    {
       return properties.getByteProperty(new SimpleString(key));
    }
 
-   public byte[] getBytesProperty(SimpleString key) throws PropertyConversionException
+   public byte[] getBytesProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getBytesProperty(key);
    }
-   
-   public byte[] getBytesProperty(String key) throws PropertyConversionException
+
+   public byte[] getBytesProperty(final String key) throws PropertyConversionException
    {
       return getBytesProperty(new SimpleString(key));
    }
 
-   public Double getDoubleProperty(SimpleString key) throws PropertyConversionException
+   public Double getDoubleProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getDoubleProperty(key);
    }
 
-   public Double getDoubleProperty(String key) throws PropertyConversionException
+   public Double getDoubleProperty(final String key) throws PropertyConversionException
    {
       return properties.getDoubleProperty(new SimpleString(key));
    }
 
-   public Integer getIntProperty(SimpleString key) throws PropertyConversionException
+   public Integer getIntProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getIntProperty(key);
    }
 
-   public Integer getIntProperty(String key) throws PropertyConversionException
+   public Integer getIntProperty(final String key) throws PropertyConversionException
    {
       return properties.getIntProperty(new SimpleString(key));
    }
 
-   public Long getLongProperty(SimpleString key) throws PropertyConversionException
+   public Long getLongProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getLongProperty(key);
    }
-   
-   public Long getLongProperty(String key) throws PropertyConversionException
+
+   public Long getLongProperty(final String key) throws PropertyConversionException
    {
       return properties.getLongProperty(new SimpleString(key));
    }
 
-   public Short getShortProperty(SimpleString key) throws PropertyConversionException
+   public Short getShortProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getShortProperty(key);
    }
-   
-   public Short getShortProperty(String key) throws PropertyConversionException
+
+   public Short getShortProperty(final String key) throws PropertyConversionException
    {
       return properties.getShortProperty(new SimpleString(key));
    }
 
-   public Float getFloatProperty(SimpleString key) throws PropertyConversionException
+   public Float getFloatProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getFloatProperty(key);
    }
 
-   public Float getFloatProperty(String key) throws PropertyConversionException
+   public Float getFloatProperty(final String key) throws PropertyConversionException
    {
       return properties.getFloatProperty(new SimpleString(key));
    }
-   
-   public String getStringProperty(SimpleString key) throws PropertyConversionException
+
+   public String getStringProperty(final SimpleString key) throws PropertyConversionException
    {
       SimpleString str = getSimpleStringProperty(key);
 
@@ -600,18 +600,18 @@
          return str.toString();
       }
    }
-   
-   public String getStringProperty(String key) throws PropertyConversionException
+
+   public String getStringProperty(final String key) throws PropertyConversionException
    {
       return getStringProperty(new SimpleString(key));
    }
-   
-   public SimpleString getSimpleStringProperty(SimpleString key) throws PropertyConversionException
+
+   public SimpleString getSimpleStringProperty(final SimpleString key) throws PropertyConversionException
    {
       return properties.getSimpleStringProperty(key);
    }
-   
-   public SimpleString getSimpleStringProperty(String key) throws PropertyConversionException
+
+   public SimpleString getSimpleStringProperty(final String key) throws PropertyConversionException
    {
       return properties.getSimpleStringProperty(new SimpleString(key));
    }
@@ -620,7 +620,7 @@
    {
       return properties.getProperty(new SimpleString(key));
    }
-   
+
    public Object removeProperty(final SimpleString key)
    {
       return properties.removeProperty(key);
@@ -648,7 +648,7 @@
 
    public TypedProperties getProperties()
    {
-      return this.properties;
+      return properties;
    }
 
    // Body

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -40,6 +40,7 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.MessageReferenceImpl;
 import org.hornetq.core.server.impl.ServerProducerCreditManager;
 import org.hornetq.core.server.impl.ServerProducerCreditManagerImpl;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -291,7 +292,7 @@
 
    public void addSize(final MessageReference reference, final boolean add) throws Exception
    {
-      long size = reference.getMemoryEstimate();
+      long size = MessageReferenceImpl.getMemoryEstimate();
 
       if (add)
       {

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -959,7 +959,7 @@
    {
       if (reaperPeriod > 0)
       {
-         reaperThread = new Thread(reaperRunnable, "HornetQ-expiry-reaper");
+         reaperThread = new Thread(reaperRunnable, "hornetq-expiry-reaper-thread");
 
          reaperThread.setPriority(reaperPriority);
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionProducerCreditsMessage.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -28,20 +28,24 @@
    // Attributes ----------------------------------------------------
 
    private int credits;
-   
+
    private SimpleString address;
 
+   private int offset;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionProducerCreditsMessage(final int credits, final SimpleString address)
+   public SessionProducerCreditsMessage(final int credits, final SimpleString address, final int offset)
    {
       super(SESS_PRODUCER_CREDITS);
 
       this.credits = credits;
-      
+
       this.address = address;
+
+      this.offset = offset;
    }
 
    public SessionProducerCreditsMessage()
@@ -55,22 +59,23 @@
    {
       return credits;
    }
-   
+
    public SimpleString getAddress()
    {
       return address;
    }
-   
-//   public boolean isRequiresConfirmations()
-//   {
-//      return false;
-//   }
 
+   public int getOffset()
+   {
+      return offset;
+   }
+
    @Override
    public void encodeBody(final HornetQBuffer buffer)
    {
       buffer.writeInt(credits);
       buffer.writeSimpleString(address);
+      buffer.writeInt(offset);
    }
 
    @Override
@@ -78,11 +83,14 @@
    {
       credits = buffer.readInt();
       address = buffer.readSimpleString();
+      offset = buffer.readInt();
    }
 
    public int getRequiredBufferSize()
    {
-      int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT + SimpleString.sizeofString(address);
+      int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+                 SimpleString.sizeofString(address) +
+                 DataConstants.SIZE_INT;
 
       return size;
    }

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -444,6 +444,8 @@
 
       FailureCheckThread(final long pauseInterval)
       {
+         super("hornetq-failure-check-thread");
+         
          this.pauseInterval = pauseInterval;
       }
 

Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -40,8 +40,6 @@
 
    void setScheduledDeliveryTime(long scheduledDeliveryTime);
 
-   int getMemoryEstimate();
-
    int getDeliveryCount();
 
    void setDeliveryCount(int deliveryCount);

Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -185,11 +185,6 @@
          return ref.getDeliveryCount();
       }
 
-      public int getMemoryEstimate()
-      {
-         return ref.getMemoryEstimate();
-      }
-
       public ServerMessage getMessage()
       {
          return ref.getMessage();

Modified: trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -74,7 +74,7 @@
 
       started = true;
 
-      thread = new Thread(new MemoryRunnable());
+      thread = new Thread(new MemoryRunnable(), "hornetq-memory-manager-thread");
 
       thread.setDaemon(true);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -45,6 +45,13 @@
 
    // Constructors --------------------------------------------------
 
+   public MessageReferenceImpl()
+   {
+      queue = null;
+      
+      message = null;
+   }
+   
    public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue)
    {
       deliveryCount = other.deliveryCount;
@@ -69,7 +76,7 @@
       return new MessageReferenceImpl(this, queue);
    }
 
-   public int getMemoryEstimate()
+   public static int getMemoryEstimate()
    {
       // from few tests I have done, deliveryCount and scheduledDelivery will use two longs (because of alignment)
       // and each of the references (messages and queue) will use the equivalent to two longs (because of long

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -28,6 +28,7 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.impl.ClientMessageImpl;
 import org.hornetq.core.client.management.impl.ManagementHelper;
 import org.hornetq.core.exception.HornetQException;
@@ -112,7 +113,31 @@
    private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
 
    // Static -------------------------------------------------------------------------------
+   
+   private static int offset;
 
+   static
+   {
+      try
+      {
+         ServerMessage msg = new ServerMessageImpl();
+         
+         msg.setBody(ChannelBuffers.dynamicBuffer(0));
+   
+         msg.setDestination(new SimpleString("foobar"));
+   
+         int es = msg.getEncodeSize();
+   
+         int me = msg.getMemoryEstimate();
+   
+         offset = MessageReferenceImpl.getMemoryEstimate() + me - es;
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to initialise mult and offset", e);
+      }
+   }
+
    // Attributes ----------------------------------------------------------------------------
 
    private final long id;
@@ -1895,7 +1920,7 @@
 
       return holder;
    }
-   
+
    private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws Exception
    {
       CreditManagerHolder holder = creditManagerHolders.get(address);
@@ -1903,7 +1928,7 @@
       if (holder == null)
       {
          PagingStore store = postOffice.getPagingManager().getPageStore(address);
-         
+
          holder = new CreditManagerHolder(store);
 
          creditManagerHolders.put(address, holder);
@@ -1913,14 +1938,14 @@
    }
 
    private void sendProducerCredits(final CreditManagerHolder holder, final int credits, final SimpleString address)
-   {
+   {      
       holder.outstandingCredits += credits;
 
-      Packet packet = new SessionProducerCreditsMessage(credits, address);
+      Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
 
       channel.send(packet);
    }
-
+   
    private void send(final ServerMessage msg) throws Exception
    {
       // Look up the paging store

Modified: trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java
===================================================================
--- trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/integration/bootstrap/HornetQBootstrapServer.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -270,6 +270,11 @@
 
    protected class Shutdown extends Thread
    {
+      public Shutdown()
+      {
+         super("hornetq-shutdown-thread");
+      }
+      
       public void run()
       {
          HornetQBootstrapServer.this.shutDown();

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -512,14 +512,7 @@
       {
          if (connections.remove(connectionID) != null)
          {
-            // // Execute on different thread to avoid deadlocks
-            // new Thread()
-            // {
-            // public void run()
-            // {
             listener.connectionDestroyed(connectionID);
-            // }
-            // }.start();
          }
       }
 

Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -318,7 +318,7 @@
                      
             timeChecker = new BatchTimeChecker();
             
-            checkerThread = new Thread(timeChecker);
+            checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
             
             batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
             
@@ -1415,7 +1415,7 @@
       //In the case of onMessage we can't close the connection from inside the onMessage method
       //since it will block waiting for onMessage to complete. In the case of start we want to return
       //from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
-      Thread t = new Thread(failureHandler);
+      Thread t = new Thread(failureHandler, "jmsbridge-failurehandler-thread");
       
       t.start();         
    }
@@ -1504,6 +1504,11 @@
     */
    private final class SourceReceiver extends Thread
    {
+      SourceReceiver()
+      {
+         super("jmsbridge-source-receiver-thread");
+      }
+      
       @Override
       public void run()
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java	2009-11-06 15:32:46 UTC (rev 8241)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java	2009-11-06 15:59:15 UTC (rev 8242)
@@ -12,22 +12,25 @@
  */
 package org.hornetq.tests.integration.cluster.failover;
 
+import java.util.Map;
+
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQ;
 
-import java.util.Map;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *         Created Oct 26, 2009
  */
 public class GroupingFailoverReplicationTest extends GroupingFailoverTestBase
 {
+   private static final Logger log = Logger.getLogger(GroupingFailoverReplicationTest.class);
 
+   
    protected void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int backupNode)
    {
       if (servers[node] != null)



More information about the hornetq-commits mailing list