RPC异步转同步
0x00 使用java的Future
java的java.util.concurrent.Future接口提供了简单的等待获取结果的功能,可以使用该类完成异步转同步的功能。
public class FutureTest {
public static void main(String[] args) {
FutureTest test = new FutureTest();
try {
System.out.println(test.get("1"));
System.out.println(test.get("2"));
System.out.println(test.get("3"));
System.out.println(test.get("4"));
System.out.println(test.get("5"));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 传入参数,获取结果
public String get(String para) throws ExecutionException, InterruptedException {
// 新建FutureTask
FutureTask<String> futureTask = new FutureTask<>(new MyCallable<>(para));
// 启动FutureTask
futureTask.run();
// 同步获取结果
return futureTask.get();
}
// 异步处理
class MyCallable<T> implements Callable<T> {
T para;
MyCallable(T para) {
this.para = para;
}
@Override
public T call() throws Exception {
// 将耗时的网络操作放在这里
Thread.sleep(1000);
return this.para;
}
}
}
这样做的优点如下:
1. 工作量小,使用java自带的功能即可完成,无需编写大量代码
2. 依赖于java自己的concurrent框架,可靠稳定
可以使用ExecutorService.submit来执行Callable,提高效率
没·
0x01 自己进行管理
流行的RPC框架目前没有直接使用Future的,都是自己写了一套类似于Future的,比如dubbo和motan。自己进行管理的优势就是可定制化,可以追踪每条请求的生命周期,可以对所有请求进行同一管理等。
更重要的原因是,RPC常使用TCP进行通信,而不是HTTP,所以发送请求后无法等待特定结果。同时同一TCP连接可能要处理好多请求,所以一般会为每个请求分配一个ID,当收到响应时再根据ID去处理对应的future。
public class RPCTest {
// 维护请求id
protected ConcurrentMap<Integer, ResultFuture> callbackMap = new ConcurrentHashMap<Integer, ResultFuture>();
public static void main(String[] args) {
RPCTest test = new RPCTest();
ResultFuture<String> future = test.new ResultFuture<String>();
// 注册Future
test.registerFuture(1, future);
// 模拟网络请求,网络处理可以同一处理
new Thread(new Runnable() {
@Override
public void run() {
try {
// 网络操作
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 将请求结果更新
test.updateFuture(1, "1");
}
}).start();
// 获取返回结果
try {
String s = future.get();
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}
// 更新网络操作执行后的结果
public void updateFuture(int sn, String value) {
ResultFuture future = this.callbackMap.get(sn);
if (future != null) {
future.setValue(value);
}
}
// 注册请求
public void registerFuture(int sn, ResultFuture future) {
this.callbackMap.put(sn, future);
}
// 请求类,类似于java的Future
class ResultFuture<T> {
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
T value;
public T getValue() {
return value;
}
public void setValue(T value) {
lock.lock();
try {
this.value = value;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}
public T get() throws Exception {
return get(3000);
}
public T get(int timeout) throws Exception {
if (timeout <= 0) {
timeout = 3000;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(500, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new Exception("调用超时");
}
}
return value;
}
public boolean isDone() {
return value != null;
}
}
}