0x00 使用java的Future

java的java.util.concurrent.Future接口提供了简单的等待获取结果的功能,可以使用该类完成异步转同步的功能。

public class FutureTest {

    public static void main(String[] args) {
        FutureTest test = new FutureTest();
        try {
            System.out.println(test.get("1"));
            System.out.println(test.get("2"));
            System.out.println(test.get("3"));
            System.out.println(test.get("4"));
            System.out.println(test.get("5"));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    
    // 传入参数,获取结果 
    public String get(String para) throws ExecutionException, InterruptedException { 
        // 新建FutureTask       
        FutureTask<String> futureTask = new FutureTask<>(new MyCallable<>(para));
        // 启动FutureTask
        futureTask.run();
        // 同步获取结果
        return futureTask.get();
    }
    
    // 异步处理
    class MyCallable<T> implements Callable<T> {

        T para;

        MyCallable(T para) {
            this.para = para;
        }

        @Override
        public T call() throws Exception {
            // 将耗时的网络操作放在这里
            Thread.sleep(1000);
            return this.para;
        }
    }
}

这样做的优点如下:

  1. 工作量小,使用java自带的功能即可完成,无需编写大量代码
  2. 依赖于java自己的concurrent框架,可靠稳定

可以使用ExecutorService.submit来执行Callable,提高效率

没·

0x01 自己进行管理

流行的RPC框架目前没有直接使用Future的,都是自己写了一套类似于Future的,比如dubbo和motan。自己进行管理的优势就是可定制化,可以追踪每条请求的生命周期,可以对所有请求进行同一管理等。

public class RPCTest {

    // 维护请求id
    protected ConcurrentMap<Integer, ResultFuture> callbackMap = new ConcurrentHashMap<Integer, ResultFuture>();

    public static void main(String[] args) {
        RPCTest test = new RPCTest();

        ResultFuture<String> future = test.new ResultFuture<String>();
        // 注册Future
        test.registerFuture(1, future);
        // 模拟网络请求,网络处理可以同一处理
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 网络操作
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 将请求结果更新
                test.updateFuture(1, "1");
            }
        }).start();
        
        // 获取返回结果
        try {
            String s = future.get();
            System.out.println(s);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 更新网络操作执行后的结果
    public void updateFuture(int sn, String value) {
        ResultFuture future = this.callbackMap.get(sn);
        if (future != null) {
            future.setValue(value);
        }
    }

    // 注册请求
    public void registerFuture(int sn, ResultFuture future) {
        this.callbackMap.put(sn, future);
    }

    // 请求类,类似于java的Future
    class ResultFuture<T> {
        final Lock lock = new ReentrantLock();
        final Condition done = lock.newCondition();
        T value;

        public T getValue() {
            return value;
        }

        public void setValue(T value) {
            lock.lock();
            try {
                this.value = value;

                if (done != null) {
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
        }

        public T get() throws Exception {
            return get(3000);
        }

        public T get(int timeout) throws Exception {
            if (timeout <= 0) {
                timeout = 3000;
            }
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        done.await(500, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                if (!isDone()) {
                    throw new Exception("调用超时");
                }
            }
            return value;
        }

        public boolean isDone() {
            return value != null;
        }
    }
}

标签: none

添加新评论