Talking about Scatter-gather Pattern

pic

Abstraction

客户端广告位展示时,会有多种展示策略,如果某个广告位没有被商家买断,那么该广告位一般按展示次数收费。为了从该广告位得到最好的收入,就要让多个广告主竞价,单次出价最高的广告主得到该广告位。

在这个竞价策略下,客户端(Android、iOS,any front-end)的View在展示之前,需要同时向多个ads provider(API)发起广告位竞价,谁给的钱最多,View就展示谁的广告内容。这个场景可以简化成Scatter-gather的模式。这里实现一个Java Demo,使用CountDownLatch和CompletableFuture很简单地解决这种parallel场景的Aggregator和follow-up问题。

API

我们假设运行环境是Android(Java),多个API同时请求(parallel),在运行时保证并发执行,最后等待并整合结果,返回给调用方。下面是为了抽象问题,设计出来的模拟数据结构。

// 广告主
interface Provider {
    public void request(); 
}
// 广告位Model
interface ComsumerModel {    
    public void request(Property, ViewCallback);
}
// 广告位View
class AdsView extends View implements ViewCallback {
    private ComsumerModel model;
    public void show() {
        model.request(getProperty(), this);
    }
}
// 一种类型的广告位
class BannerComsumerModel implements ComsumerModel {
    // @param ViewClassback The main thread Handler wrapper
    public void request(final Set<Provider> providers, Handler callback) {
        requestAsync(providers, callback);
    }
    // run in a work-thread.
    public void requestAsync(final Set<Provider> providers, Handler callback) {
        worksPool.submit( run() -> {
                providers.foreach {
                    it.request();
                }
                // ... How to get the right result?
                ViewClassback.sendMessage(result);
            }
        });
    }
}

Implementation

requestAsync方法中同时请求了多个API(广告主),下面aggregator的逻辑功能,并忽略了次要部分。

1. CountDownLatch(JDK5)
run() -> {
    // ...
    CountDownLatch latch = new CountDownLatch(providers.size());
    providers.foreach {
        it.request();
    }
    latch.await(); // block until aggregator logic done  
    result = getBestResullt(providers);
    // ...
}
class A1Provider implements Provider {
    public void request() {
        // wating until we got our price
        latch.countDown();
    }
}
class A2Provider implements Provider {
    public void request() {
        // wating until we got our price
        latch.countDown();
    }
}

2. CompletableFuture
run() -> {
    if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.N) {
        A1Provider[] providers = build();
        CompletableFuture p1 = CompletableFuture.runAsync(providers[0]);
        CompletableFuture p2 = CompletableFuture.runAsync(providers[1]);
        CompletableFuture all = CompletableFuture.allOf(p1, p2);
        // block until aggregator logic done 
        all.get(); 
        result = getBestResullt(providers);
    }
    // ...
}
class A1Provider implements Provider, Runnable  {
    public void request() {
        // wating until we got our price
    }
    @Override
    public void run() { request(); }
}
class A2Provider implements Provider, Runnable {
    public void request() {
        // wating until we got our price
    }
    @Override
    public void run() { request(); }
}

3. Follow-up
run() -> {
    // ...
    CountDownLatch latch = new CountDownLatch(providers.size());
    providers.foreach {
        it.request();
    }
    // block until aggregator logic done or timeout
    latch.await(long timeout, TimeUnit unit);
    result = getBestResullt(providers);
    // ...
}
// if A2Provider is the most priority.
class A2Provider implements Provider {
    public void request() {
        // wating until we got our price
        latch.countDown();
        if(latch.getCount() > 1) {
            worksPool.shutdownNow();   
        }
    }
}
4. Exception handling

设计细节上就需要考虑内存泄露,线程对象中断,等问题了。

5. Testing

如何设计测试数据,测试逻辑,其实比解决问题更难,但也有可参考:

https://github.com/mbasiuk/tech/wiki/F.I.R.S.T-Principles-of-Unit-Testing

6. Performace & Optimzation

使用分配池边界等。略。

最后,除了Aggregator,其他问题没有给出具体方法,但这个模式的问题出现比较广泛,例如电商,机票等竞价有关的场景中。可以参考。

References:

https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html