[jboss-cvs] JBoss Messaging SVN: r3828 - in trunk/src/main/org/jboss/messaging/core: remoting/impl/codec and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Feb 28 05:55:34 EST 2008


Author: timfox
Date: 2008-02-28 05:55:34 -0500 (Thu, 28 Feb 2008)
New Revision: 3828

Added:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
   trunk/src/main/org/jboss/messaging/core/server/FlowController.java
   trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java
Log:
New files


Added: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java	2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,21 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.client.ClientProducer;
+
+/**
+ * 
+ * A ClientProducerInternal
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ClientProducerInternal extends ClientProducer
+{
+	void receiveTokens(int tokens) throws Exception;
+}

Added: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java	2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,65 @@
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketType;
+
+/**
+ * 
+ * A ClientProducerPacketHandler
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ClientProducerPacketHandler implements PacketHandler
+{
+   private static final Logger log = Logger.getLogger(ClientProducerPacketHandler.class);
+
+   private final ClientProducerInternal clientProducer;
+
+   private final String producerID;
+
+   public ClientProducerPacketHandler(final ClientProducerInternal clientProducer, final String producerID)
+   {
+      this.clientProducer = clientProducer;
+      
+      this.producerID = producerID;
+   }
+
+   public String getID()
+   {
+      return producerID;
+   }
+
+   public void handle(final Packet packet, final PacketSender sender)
+   {
+      try
+      {
+         PacketType type = packet.getType();
+         
+         if (type == PacketType.PROD_RECEIVETOKENS)
+         {
+            ProducerReceiveTokensMessage message = (ProducerReceiveTokensMessage) packet;
+            
+            clientProducer.receiveTokens(message.getTokens());
+         }
+         else
+         {
+         	throw new IllegalStateException("Invalid packet: " + type);
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle packet", e);
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      return "ClientProducerPacketHandler[id=" + producerID + "]";
+   }
+}
\ No newline at end of file

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java	2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.codec;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.PROD_RECEIVETOKENS;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+
+/**
+ * 
+ * A ProducerReceiveTokensMessageCodec
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ProducerReceiveTokensMessageCodec extends AbstractPacketCodec<ProducerReceiveTokensMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ProducerReceiveTokensMessageCodec()
+   {
+      super(PROD_RECEIVETOKENS);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(ProducerReceiveTokensMessage message, RemotingBuffer out) throws Exception
+   {
+      out.putInt(INT_LENGTH);
+      out.putInt(message.getTokens());
+   }
+
+   @Override
+   protected ProducerReceiveTokensMessage decodeBody(RemotingBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+
+      return new ProducerReceiveTokensMessage(in.getInt());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private ----------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java	2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+/**
+ * 
+ * A ProducerReceiveTokensMessage
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ProducerReceiveTokensMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final int tokens;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ProducerReceiveTokensMessage(final int tokens)
+   {
+      super(PacketType.PROD_RECEIVETOKENS);
+
+      this.tokens = tokens;
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getTokens()
+   {
+      return tokens;
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buf = new StringBuffer(getParentString());
+      buf.append(", tokens=" + tokens);
+      buf.append("]");
+      return buf.toString();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/server/FlowController.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/FlowController.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/FlowController.java	2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+
+/**
+ * 
+ * A FlowController
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface FlowController
+{
+	void registerProducer(ServerProducer producer);
+	
+	void messageAcknowledged() throws Exception;
+	
+	void checkTokens(ServerProducer producer) throws Exception;
+}

Added: trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java	2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,139 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.List;
+
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.FlowController;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerProducer;
+
+/**
+ * 
+ * A FlowControllerImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FlowControllerImpl implements FlowController
+{
+	private int tokenPot;
+	
+	private final int batchSize = 50;
+	
+	private final PostOffice postOffice;
+	
+	private final int limit;
+	
+	private ServerProducer producer;
+	
+	private volatile boolean waiting;
+	
+	private final String address;
+	
+	public FlowControllerImpl(String address, PostOffice postOffice, int limit) throws Exception
+	{
+		this.address = address;
+		
+		this.postOffice = postOffice;
+		
+		this.limit = limit;
+		
+		getTokens();
+	}
+	
+	public void registerProducer(ServerProducer producer)
+	{
+		this.producer = producer;
+	}
+	
+	private void getTokens() throws Exception
+	{
+		List<Binding> bindings = postOffice.getBindingsForAddress(address);
+		
+		int minAvailable = Integer.MAX_VALUE;
+		
+		for (Binding binding: bindings)
+		{
+			Queue queue = binding.getQueue();
+			
+			int available = limit - queue.getMessageCount();
+			
+			if (available < 0)
+			{
+				available = 0;
+			}
+			
+			minAvailable = Math.min(available, minAvailable);			
+		}
+		
+      tokenPot += minAvailable;		
+	}
+		
+	public void messageAcknowledged() throws Exception
+	{
+		if (waiting)
+		{
+			getTokens();
+			
+			if (tokenPot >= batchSize)
+			{
+				tokenPot -= batchSize;
+				
+				waiting = false;
+				
+				producer.sendCredits(batchSize);
+			}			
+		}
+	}
+	
+	public void checkTokens(ServerProducer producer) throws Exception
+	{						
+		if (tokenPot < batchSize)
+		{
+			if (!waiting)
+			{
+				//Try and get some more
+				getTokens();
+				
+				if (tokenPot >= batchSize)
+				{
+					tokenPot -= batchSize;
+					
+					producer.sendCredits(batchSize);
+				}
+				else
+				{
+					waiting = true;
+				}
+			}
+		}
+		else
+		{
+			tokenPot -= batchSize;
+			
+			producer.sendCredits(batchSize);	
+		}
+	}
+}




More information about the jboss-cvs-commits mailing list