Can you share ChannelFactory / worker threads in a p2p app?
Trustin Lee (이희승)
trustin at gmail.com
Mon Nov 30 17:26:06 EST 2009
Thanks a lot! Your patch looks fine to me. I will apply it soon.
— Trustin Lee, http://gleamynode.net/
On Mon, Nov 23, 2009 at 10:38 AM, Thingfish <jannick at ovja.dk> wrote:
>
> There is probably not much of a performance difference looking at the
> individual app. I want to be able to run a large number of instances on a
> single machine (for testing), and number of threads might become an issue in
> this case.
>
> I've put a simple (as of yet untested) suggestion together, as for how
> worker sharing might be archived. I noticed that NioServerSocketPipelineSink
> and NioClientSocketPipelineSink implements the same round-robin load
> balancing over a worker array, and that the sinks make no use of
> workerExecutor and workerCount once the worker array has been constructed.
> I've factored this out into a NioWorkerPool class, and changed the sinks to
> receive this instead. The channel factories have had a new constructor
> added, that takes a pre-constructed workerPool, thereby making it possible
> to share the pool between clients and servers.
>
> Have included the code below (as a patch against trunk). A few issues: Need
> to assign a useful bossID to a shared worker, and also change NioWorker so
> that an appropriate threadName is selected. Do you foresee any issues with
> this solution?
>
> Regards
> Jannick Bitsch
>
>
> ### Eclipse Workspace Patch 1.0
> #P Netty
> Index:
> src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
> ===================================================================
> ---
> src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
> (revision
> 1904)
> +++
> src/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java
> (working
> copy)
> @@ -118,23 +118,30 @@
> public NioClientSocketChannelFactory(
> Executor bossExecutor, Executor workerExecutor,
> int workerCount) {
> + this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
> + }
> +
> +
> + /**
> + * Creates a new instance.
> + *
> + * @param bossExecutor
> + * the {@link Executor} which will execute the boss thread
> + * @param workerPool
> + * the {@link NioWorkerPool} which will supply nio workers
> + */
> + public NioClientSocketChannelFactory(
> + Executor bossExecutor, NioWorkerPool workerPool) {
> if (bossExecutor == null) {
> throw new NullPointerException("bossExecutor");
> }
> - if (workerExecutor == null) {
> - throw new NullPointerException("workerExecutor");
> - }
> - if (workerCount <= 0) {
> - throw new IllegalArgumentException(
> - "workerCount (" + workerCount + ") " +
> - "must be a positive integer.");
> - }
>
> this.bossExecutor = bossExecutor;
> - this.workerExecutor = workerExecutor;
> - sink = new NioClientSocketPipelineSink(bossExecutor,
> workerExecutor, workerCount);
> - }
> + this.workerExecutor = workerPool.workerExecutor;
> + sink = new NioClientSocketPipelineSink(bossExecutor, workerPool);
> + }
>
> +
> public SocketChannel newChannel(ChannelPipeline pipeline) {
> return new NioClientSocketChannel(this, pipeline, sink,
> sink.nextWorker());
> }
> Index:
> src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
> ===================================================================
> --- src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
> (revision
> 1904)
> +++ src/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java
> (working
> copy)
> @@ -63,16 +63,12 @@
> final int id = nextId.incrementAndGet();
> final Executor bossExecutor;
> private final Boss boss = new Boss();
> - private final NioWorker[] workers;
> - private final AtomicInteger workerIndex = new AtomicInteger();
> + private final NioWorkerPool workerPool;
>
> NioClientSocketPipelineSink(
> - Executor bossExecutor, Executor workerExecutor, int
> workerCount) {
> + Executor bossExecutor, NioWorkerPool workerPool) {
> this.bossExecutor = bossExecutor;
> - workers = new NioWorker[workerCount];
> - for (int i = 0; i < workers.length; i ++) {
> - workers[i] = new NioWorker(id, i + 1, workerExecutor);
> - }
> + this.workerPool = workerPool;
> }
>
> public void eventSunk(
> @@ -160,8 +156,7 @@
> }
>
> NioWorker nextWorker() {
> - return workers[Math.abs(
> - workerIndex.getAndIncrement() % workers.length)];
> + return workerPool.nextWorker();
> }
>
> private final class Boss implements Runnable {
> Index:
> src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
> ===================================================================
> --- src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
> (revision
> 1904)
> +++ src/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java
> (working
> copy)
> @@ -57,14 +57,10 @@
> private static final AtomicInteger nextId = new AtomicInteger();
>
> private final int id = nextId.incrementAndGet();
> - private final NioWorker[] workers;
> - private final AtomicInteger workerIndex = new AtomicInteger();
> + private final NioWorkerPool workerPool;
>
> - NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
> - workers = new NioWorker[workerCount];
> - for (int i = 0; i < workers.length; i ++) {
> - workers[i] = new NioWorker(id, i + 1, workerExecutor);
> - }
> + NioServerSocketPipelineSink(NioWorkerPool workerPool) {
> + this.workerPool = workerPool;
> }
>
> public void eventSunk(
> @@ -191,8 +187,7 @@
> }
>
> NioWorker nextWorker() {
> - return workers[Math.abs(
> - workerIndex.getAndIncrement() % workers.length)];
> + return workerPool.nextWorker();
> }
>
> private final class Boss implements Runnable {
> Index: src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java
> ===================================================================
> --- src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java (revision 0)
> +++ src/org/jboss/netty/channel/socket/nio/NioWorkerPool.java (revision 0)
> @@ -0,0 +1,34 @@
> +package org.jboss.netty.channel.socket.nio;
> +
> +import java.util.concurrent.Executor;
> +import java.util.concurrent.atomic.AtomicInteger;
> +
> +public class NioWorkerPool {
> +
> + private final NioWorker[] workers;
> + private final AtomicInteger workerIndex = new AtomicInteger();
> + final Executor workerExecutor;
> +
> + public NioWorkerPool(Executor workerExecutor, int workerCount){
> + if (workerExecutor == null) {
> + throw new NullPointerException("workerExecutor");
> + }
> + if (workerCount <= 0) {
> + throw new IllegalArgumentException(
> + "workerCount (" + workerCount + ") " +
> + "must be a positive integer.");
> + }
> +
> + this.workerExecutor = workerExecutor;
> +
> + workers = new NioWorker[workerCount];
> + for (int i = 0; i < workers.length; i ++) {
> + workers[i] = new NioWorker(0xBEEF, i + 1, workerExecutor);
> + }
> + }
> +
> + public NioWorker nextWorker() {
> + return workers[Math.abs(
> + workerIndex.getAndIncrement() % workers.length)];
> + }
> +}
> \ No newline at end of file
> Index:
> src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
> ===================================================================
> ---
> src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
> (revision
> 1904)
> +++
> src/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java
> (working
> copy)
> @@ -122,22 +122,34 @@
> public NioServerSocketChannelFactory(
> Executor bossExecutor, Executor workerExecutor,
> int workerCount) {
> + this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
> + }
> +
> + /**
> + * Creates a new instance.
> + *
> + * @param bossExecutor
> + * the {@link Executor} which will execute the boss threads
> + * @param workerPool
> + * the {@link NioWorkerPool} which will supply nio workers
> + */
> + public NioServerSocketChannelFactory(
> + Executor bossExecutor, NioWorkerPool workerPool){
> +
> if (bossExecutor == null) {
> throw new NullPointerException("bossExecutor");
> }
> - if (workerExecutor == null) {
> - throw new NullPointerException("workerExecutor");
> +
> + if (workerPool == null) {
> + throw new NullPointerException("workerPool");
> }
> - if (workerCount <= 0) {
> - throw new IllegalArgumentException(
> - "workerCount (" + workerCount + ") " +
> - "must be a positive integer.");
> - }
> - this.bossExecutor = bossExecutor;
> - this.workerExecutor = workerExecutor;
> - sink = new NioServerSocketPipelineSink(workerExecutor,
> workerCount);
> +
> + this.bossExecutor = bossExecutor;
> + this.workerExecutor = workerPool.workerExecutor;
> + sink = new NioServerSocketPipelineSink(workerPool);
> }
> -
> +
> +
> public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
> return new NioServerSocketChannel(this, pipeline, sink);
> }
>
>
>
>
> 2009/11/23 Trustin Lee [via Netty Forums and Mailing Lists] <
> ml-node+4048335-1313223483 at n2.nabble.com<ml-node%2B4048335-1313223483 at n2.nabble.com>
>>
>
>> When I design Netty API, sharing worker threads for client and server
>> side seemed to complicate the API too much comparing to the gained
>> performance, and therefore the worker threads are not shared by
>> different ChannelFactories.
>>
>> If there is obvious advantage in performance, I'd like to implement it
>> though. Please let me know if you find something.
>>
>> Thanks
>>
>> — Trustin Lee, http://gleamynode.net/
>>
>>
>>
>> On Mon, Nov 23, 2009 at 8:42 AM, Thingfish <[hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=0>>
>> wrote:
>>
>> >
>> > Hi
>> >
>> > I'm currently in the process of coding a peer 2 peer app on top of Netty.
>> By
>> > definition each peer acts as both a server and a client, which means that
>> I
>> > currently make use of both the ClientBootstrap and ServerBootstrap to
>> > support "connection setup" in both directions. Once established
>> "incomming"
>> > and "outgoing" connections are indistinct able, and for this reason they
>> > share the same pipeline factory.
>> >
>> > My question is, if its possible to also have them share the same worker
>> > threads and selectors? Currently the use of seperate NioClient and
>> NioServer
>> > SocketChannelFactories means, that two disjoint sets of worker threads
>> and
>> > I/O resources are created. As both these worker pools deliver the exact
>> same
>> > service, it would be more efficient to avoid the extra threads.
>> >
>> > --
>> > View this message in context:
>> http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048241.html
>> > Sent from the Netty User Group mailing list archive at Nabble.com.
>> > _______________________________________________
>> > netty-users mailing list
>> > [hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=1>
>> > https://lists.jboss.org/mailman/listinfo/netty-users
>> >
>>
>> _______________________________________________
>> netty-users mailing list
>> [hidden email]<http://n2.nabble.com/user/SendEmail.jtp?type=node&node=4048335&i=2>
>> https://lists.jboss.org/mailman/listinfo/netty-users
>> — Trustin Lee, http://gleamynode.net/
>>
>>
>> ------------------------------
>> View message @
>> http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048335.html
>> To unsubscribe from Can you share ChannelFactory / worker threads in a p2p
>> app?, click here< (link removed) =>.
>>
>>
>>
>
> --
> View this message in context: http://n2.nabble.com/Can-you-share-ChannelFactory-worker-threads-in-a-p2p-app-tp4048241p4048535.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
>
More information about the netty-users
mailing list