分类 Java 下的文章

RPC客户端负载均衡

0x00 负载均衡的插入点

一般是客户端或调用方进行负载均衡的处理。

0x01 常见的负载均衡策略

LocalFirst-本地优先

获取所有的本地服务提供者,按照一定顺序,取得被调用次数最小的服务提供者。

public class LocalFirstLoadBalance {


    public static void main(String[] args) {
        LocalFirstLoadBalance balance = new LocalFirstLoadBalance();
        for (int i = 0; i < 10; i++) {
            Refer refer = balance.getRefer();
            System.out.println(refer);
            refer.incrActiveCount("");
        }
    }

    public Refer getRefer() {
        List<Refer> refers = getLocalRefers();
        Refer refer = null;
        for (int i = 0; i < refers.size(); i++) {
            int index = i % refers.size();
            Refer temp = refers.get(index);
            if (!ReferUtils.isReferAvailable(temp)) {
                continue;
            }

            if (refer == null) {
                refer = temp;
            } else {
                if (compare(refer, temp) > 0) {
                    refer = temp;
                }
            }

        }
        return refer;
    }

    private int compare(Refer referer1, Refer referer2) {
        return referer1.activeRefererCount() - referer2.activeRefererCount();
    }

    private List<Refer> getLocalRefers() {
        List<Refer> allRefers = ReferUtils.getAllRefers();
        List<Refer> collect = allRefers.stream().filter(ReferUtils::isLocalRefer).collect(Collectors.toList());
        return collect;
    }

}

- 阅读剩余部分 -

0x00 使用java的Future

java的java.util.concurrent.Future接口提供了简单的等待获取结果的功能,可以使用该类完成异步转同步的功能。

public class FutureTest {

    public static void main(String[] args) {
        FutureTest test = new FutureTest();
        try {
            System.out.println(test.get("1"));
            System.out.println(test.get("2"));
            System.out.println(test.get("3"));
            System.out.println(test.get("4"));
            System.out.println(test.get("5"));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    
    // 传入参数,获取结果 
    public String get(String para) throws ExecutionException, InterruptedException { 
        // 新建FutureTask       
        FutureTask<String> futureTask = new FutureTask<>(new MyCallable<>(para));
        // 启动FutureTask
        futureTask.run();
        // 同步获取结果
        return futureTask.get();
    }
    
    // 异步处理
    class MyCallable<T> implements Callable<T> {

        T para;

        MyCallable(T para) {
            this.para = para;
        }

        @Override
        public T call() throws Exception {
            // 将耗时的网络操作放在这里
            Thread.sleep(1000);
            return this.para;
        }
    }
}

这样做的优点如下:
1. 工作量小,使用java自带的功能即可完成,无需编写大量代码
2. 依赖于java自己的concurrent框架,可靠稳定

可以使用ExecutorService.submit来执行Callable,提高效率

没·

0x01 自己进行管理

流行的RPC框架目前没有直接使用Future的,都是自己写了一套类似于Future的,比如dubbo和motan。自己进行管理的优势就是可定制化,可以追踪每条请求的生命周期,可以对所有请求进行同一管理等。

更重要的原因是,RPC常使用TCP进行通信,而不是HTTP,所以发送请求后无法等待特定结果。同时同一TCP连接可能要处理好多请求,所以一般会为每个请求分配一个ID,当收到响应时再根据ID去处理对应的future。

public class RPCTest {

    // 维护请求id
    protected ConcurrentMap<Integer, ResultFuture> callbackMap = new ConcurrentHashMap<Integer, ResultFuture>();

    public static void main(String[] args) {
        RPCTest test = new RPCTest();

        ResultFuture<String> future = test.new ResultFuture<String>();
        // 注册Future
        test.registerFuture(1, future);
        // 模拟网络请求,网络处理可以同一处理
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 网络操作
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 将请求结果更新
                test.updateFuture(1, "1");
            }
        }).start();
        
        // 获取返回结果
        try {
            String s = future.get();
            System.out.println(s);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 更新网络操作执行后的结果
    public void updateFuture(int sn, String value) {
        ResultFuture future = this.callbackMap.get(sn);
        if (future != null) {
            future.setValue(value);
        }
    }

    // 注册请求
    public void registerFuture(int sn, ResultFuture future) {
        this.callbackMap.put(sn, future);
    }

    // 请求类,类似于java的Future
    class ResultFuture<T> {
        final Lock lock = new ReentrantLock();
        final Condition done = lock.newCondition();
        T value;

        public T getValue() {
            return value;
        }

        public void setValue(T value) {
            lock.lock();
            try {
                this.value = value;

                if (done != null) {
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
        }

        public T get() throws Exception {
            return get(3000);
        }

        public T get(int timeout) throws Exception {
            if (timeout <= 0) {
                timeout = 3000;
            }
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        done.await(500, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                if (!isDone()) {
                    throw new Exception("调用超时");
                }
            }
            return value;
        }

        public boolean isDone() {
            return value != null;
        }
    }
}

0x00 基础

多例bean和单例bean,都使用注解@Component纳入到spring的管理,不同之处是多例bean使用注解@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)标注,而单例bean是默认的,或者显示的使用注解@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)标注。

- 阅读剩余部分 -