[jboss-cvs] JBoss Messaging SVN: r3828 - in trunk/src/main/org/jboss/messaging/core: remoting/impl/codec and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Feb 28 05:55:34 EST 2008
Author: timfox
Date: 2008-02-28 05:55:34 -0500 (Thu, 28 Feb 2008)
New Revision: 3828
Added:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
trunk/src/main/org/jboss/messaging/core/server/FlowController.java
trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java
Log:
New files
Added: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerInternal.java 2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,21 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.client.ClientProducer;
+
+/**
+ *
+ * A ClientProducerInternal
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ClientProducerInternal extends ClientProducer
+{
+ void receiveTokens(int tokens) throws Exception;
+}
Added: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerPacketHandler.java 2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,65 @@
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketType;
+
+/**
+ *
+ * A ClientProducerPacketHandler
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ClientProducerPacketHandler implements PacketHandler
+{
+ private static final Logger log = Logger.getLogger(ClientProducerPacketHandler.class);
+
+ private final ClientProducerInternal clientProducer;
+
+ private final String producerID;
+
+ public ClientProducerPacketHandler(final ClientProducerInternal clientProducer, final String producerID)
+ {
+ this.clientProducer = clientProducer;
+
+ this.producerID = producerID;
+ }
+
+ public String getID()
+ {
+ return producerID;
+ }
+
+ public void handle(final Packet packet, final PacketSender sender)
+ {
+ try
+ {
+ PacketType type = packet.getType();
+
+ if (type == PacketType.PROD_RECEIVETOKENS)
+ {
+ ProducerReceiveTokensMessage message = (ProducerReceiveTokensMessage) packet;
+
+ clientProducer.receiveTokens(message.getTokens());
+ }
+ else
+ {
+ throw new IllegalStateException("Invalid packet: " + type);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle packet", e);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ClientProducerPacketHandler[id=" + producerID + "]";
+ }
+}
\ No newline at end of file
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ProducerReceiveTokensMessageCodec.java 2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.codec;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketType.PROD_RECEIVETOKENS;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.ProducerReceiveTokensMessage;
+
+/**
+ *
+ * A ProducerReceiveTokensMessageCodec
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ProducerReceiveTokensMessageCodec extends AbstractPacketCodec<ProducerReceiveTokensMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ProducerReceiveTokensMessageCodec()
+ {
+ super(PROD_RECEIVETOKENS);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(ProducerReceiveTokensMessage message, RemotingBuffer out) throws Exception
+ {
+ out.putInt(INT_LENGTH);
+ out.putInt(message.getTokens());
+ }
+
+ @Override
+ protected ProducerReceiveTokensMessage decodeBody(RemotingBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ return new ProducerReceiveTokensMessage(in.getInt());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private ----------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ProducerReceiveTokensMessage.java 2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+/**
+ *
+ * A ProducerReceiveTokensMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ProducerReceiveTokensMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final int tokens;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ProducerReceiveTokensMessage(final int tokens)
+ {
+ super(PacketType.PROD_RECEIVETOKENS);
+
+ this.tokens = tokens;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getTokens()
+ {
+ return tokens;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", tokens=" + tokens);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/server/FlowController.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/FlowController.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/FlowController.java 2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+
+/**
+ *
+ * A FlowController
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface FlowController
+{
+ void registerProducer(ServerProducer producer);
+
+ void messageAcknowledged() throws Exception;
+
+ void checkTokens(ServerProducer producer) throws Exception;
+}
Added: trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/FlowControllerImpl.java 2008-02-28 10:55:34 UTC (rev 3828)
@@ -0,0 +1,139 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.List;
+
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.FlowController;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerProducer;
+
+/**
+ *
+ * A FlowControllerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FlowControllerImpl implements FlowController
+{
+ private int tokenPot;
+
+ private final int batchSize = 50;
+
+ private final PostOffice postOffice;
+
+ private final int limit;
+
+ private ServerProducer producer;
+
+ private volatile boolean waiting;
+
+ private final String address;
+
+ public FlowControllerImpl(String address, PostOffice postOffice, int limit) throws Exception
+ {
+ this.address = address;
+
+ this.postOffice = postOffice;
+
+ this.limit = limit;
+
+ getTokens();
+ }
+
+ public void registerProducer(ServerProducer producer)
+ {
+ this.producer = producer;
+ }
+
+ private void getTokens() throws Exception
+ {
+ List<Binding> bindings = postOffice.getBindingsForAddress(address);
+
+ int minAvailable = Integer.MAX_VALUE;
+
+ for (Binding binding: bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ int available = limit - queue.getMessageCount();
+
+ if (available < 0)
+ {
+ available = 0;
+ }
+
+ minAvailable = Math.min(available, minAvailable);
+ }
+
+ tokenPot += minAvailable;
+ }
+
+ public void messageAcknowledged() throws Exception
+ {
+ if (waiting)
+ {
+ getTokens();
+
+ if (tokenPot >= batchSize)
+ {
+ tokenPot -= batchSize;
+
+ waiting = false;
+
+ producer.sendCredits(batchSize);
+ }
+ }
+ }
+
+ public void checkTokens(ServerProducer producer) throws Exception
+ {
+ if (tokenPot < batchSize)
+ {
+ if (!waiting)
+ {
+ //Try and get some more
+ getTokens();
+
+ if (tokenPot >= batchSize)
+ {
+ tokenPot -= batchSize;
+
+ producer.sendCredits(batchSize);
+ }
+ else
+ {
+ waiting = true;
+ }
+ }
+ }
+ else
+ {
+ tokenPot -= batchSize;
+
+ producer.sendCredits(batchSize);
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list