TL;DR: 本文介绍一个手写的 Java RPC 框架 pig-rpc,基于 Socket 通信、Java 动态代理、ZooKeeper 服务注册与发现,实现了一个完整的分布式服务调用框架。包含服务发布/订阅、负载均衡(随机策略)、服务缓存、单例工厂、线程池管理等核心组件,代码结构清晰,适合作为理解 RPC 原理的入门项目。
一、为什么需要 RPC
在分布式系统中,服务通常部署在不同的机器上。一个简单的 HTTP 调用在跨进程场景下会产生大量开销,而 RPC(Remote Procedure Call)通过自定义通信协议和序列化机制,能够比 HTTP 更快、更灵活地实现跨进程调用。
本文介绍的 pig-rpc 是一个教学级别的 RPC 框架实现,涵盖了 RPC 的核心要素,不依赖 Spring 等框架,适合理解 RPC 底层原理。
本项目面向具备 Java 基础和分布式系统基本概念的开发者在阅读前建议熟悉:接口与实现分离、单例模式、线程池、Socket 通信基础。
二、系统架构
┌─────────────────────────────────┐
│ Client │
│ │
│ ProxyUtils.getProxy(UserService.class)
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ RpcClientProxy │ │
│ │ (JDK Dynamic Proxy) │ │
│ └────────────┬──────────────┘ │
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ SocketRpcClient │ │
│ │ (ServiceDiscovery lookup)│ │
│ └────────────┬──────────────┘ │
└────────────────┼────────────────┘
│
┌──────────────────────────┼──────────────────────────┐
│ ↓ │
│ ┌──────────────────────────────┐ │
│ │ ZooKeeper (注册中心) │ │
│ │/pig-rpc/com.pig.api.UserService│ │
│ │ /192.168.79.1:8888 │ │
│ └──────────────────────────────┘ │
│ ↑ │
└──────────────────────────┼──────────────────────────┘
│
┌────────────────┼────────────────┐
│ ↓ │
│ ┌──────────────────────────┐ │
│ │ SocketRpcServer │ │
│ │ (ServerSocket.accept) │ │
│ └────────────┬─────────────┘ │
│ ↓ │
│ ┌──────────────────────────┐ │
│ │ SocketReqHandler │ │
│ │ (per-connection) │ │
│ └────────────┬─────────────┘ │
│ ↓ │
│ ┌──────────────────────────┐ │
│ │ RpcRequestHandler │ │
│ │ (reflection invoke) │ │
│ └────────────┬─────────────┘ │
│ ↓ │
│ ┌──────────────────────────┐ │
│ │ UserServiceImpl │ │
│ │ (业务实现) │ │
│ └──────────────────────────┘ │
│ │
│ Server │
└──────────────────────────────────┘三、核心模块实现
3.1 通信层:Socket 传输
客户端使用原生 Socket 发起请求,服务端使用 ServerSocket 接收连接:
// SocketRpcClient.java
public RpcResponse<?> sendReq(RpcRequest rpcRequest) {
// 1. 通过 ZooKeeper 发现服务地址
InetSocketAddress address = serviceDiscovery.lookupService(rpcRequest);
try (Socket socket = new Socket(address.getAddress(), address.getPort())) {
// 2. 序列化请求并发送
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rpcRequest);
outputStream.flush();
// 3. 读取响应
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
Object o = inputStream.readObject();
return (RpcResponse<?>) o;
} catch (Exception e) {
throw new RuntimeException(e);
}
}服务端为每个连接分配一个线程处理:
// SocketRpcServer.java
public void start() {
ShutdownHookUtils.clearAll();
try (ServerSocket serverSocket = new ServerSocket(port)) {
log.info("服务启动, 端口: {}", port);
Socket socket;
while ((socket = serverSocket.accept()) != null) {
// 每个连接一个线程处理
executor.submit(new SocketReqHandler(socket, rpcRequestHandler));
}
}
}3.2 请求与响应:DTO 设计
// RpcRequest.java
@Data
public class RpcRequest implements Serializable {
private String requestId; // 请求唯一 ID,用于关联响应
private String interfaceName; // 接口名
private String methodName; // 方法名
private Object[] params; // 参数列表
private Class<?>[] paramTypes; // 参数类型
private String version; // 版本号(支持多版本)
private String group; // 分组(支持分组隔离)
// rpcServiceName = interfaceName + version + group
public String rpcServiceName() {
return getInterfaceName()
+ StrUtil.blankToDefault(getVersion(), StrUtil.EMPTY)
+ StrUtil.blankToDefault(getGroup(), StrUtil.EMPTY);
}
} // RpcResponse.java
@Data
public class RpcResponse<T> implements Serializable {
private String requestId; // 关联请求 ID
private Integer code; // 状态码:0=成功,9999=失败
private String msg; // 错误信息
private T data; // 响应数据
public static <T> RpcResponse<T> success(String requestId, T data) {
RpcResponse<T> rpcResponse = new RpcResponse<T>();
rpcResponse.setRequestId(requestId);
rpcResponse.setCode(RpcResponseStatus.SUCCESS.getCode());
rpcResponse.setData(data);
return rpcResponse;
}
}3.3 动态代理:客户端调用透明化
RPC 客户端不需要知道底层 Socket 细节,通过 JDK 动态代理实现透明调用:
// RpcClientProxy.java
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1. 构建 RPC 请求
RpcRequest rpcRequest = RpcRequest.builder()
.requestId(IdUtil.fastSimpleUUID())
.interfaceName(method.getDeclaringClass().getCanonicalName())
.methodName(method.getName())
.params(args)
.paramTypes(method.getParameterTypes())
.version(config.getVersion())
.group(config.getGroup())
.build();
// 2. 发送请求(Socket 通信)
RpcResponse<?> rpcResponse = rpcClient.sendReq(rpcRequest);
// 3. 校验响应
check(rpcRequest, rpcResponse);
// 4. 返回结果(对调用方透明)
return rpcResponse.getData();
}用户调用方式完全透明:
// Client
UserService userService = ProxyUtils.getProxy(UserService.class);
User user = userService.getUser(1L); // 看起来像本地调用,实际是 RPC3.4 服务端请求处理:反射调用
// RpcRequestHandler.java
public Object invoke(RpcRequest rpcRequest) {
String rpcServiceName = rpcRequest.rpcServiceName();
// 1. 获取服务实例
Object service = serviceProvider.getService(rpcServiceName);
// 2. 通过反射调用方法
Method method = service.getClass().getMethod(
rpcRequest.getMethodName(),
rpcRequest.getParamTypes()
);
return method.invoke(service, rpcRequest.getParams());
}3.5 服务注册与发现:ZooKeeper
服务端发布服务时,将地址注册到 ZooKeeper:
// ZkServiceRegistry.java
public void registerService(String rpcServiceName, InetSocketAddress address) {
// /pig-rpc/com.pig.api.UserService/192.168.79.1:8888
String path = RpcConstant.ZK_RPC_ROOT_PATH
+ StrUtil.SLASH
+ rpcServiceName
+ StrUtil.SLASH
+ IPUtils.toIpPort(address);
zkClient.createPersistentNode(path);
}客户端发现服务时,从 ZooKeeper 读取可用地址列表:
// ZkServiceDiscovery.java
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
String path = RpcConstant.ZK_RPC_ROOT_PATH + StrUtil.SLASH + rpcRequest.rpcServiceName();
// 从 ZK 获取该服务的所有可用地址
List<String> children = zkClient.getChildrenNode(path);
// 负载均衡选择一个
String address = loadBalance.select(children);
return IPUtils.toInetSocketAddress(address);
}3.6 负载均衡:随机策略
// RandomLoadBalance.java
public String select(List<String> list) {
return RandomUtil.randomEle(list);
}3.7 单例工厂:组件管理
避免每次创建新的 ZooKeeper 连接等重量级对象:
// SingletonFactory.java
public static <T> T getInstance(Class<T> clazz) {
if (INSTANCE_CACHE.containsKey(clazz)) {
return clazz.cast(INSTANCE_CACHE.get(clazz));
}
synchronized (SingletonFactory.class) {
if (INSTANCE_CACHE.containsKey(clazz)) {
return clazz.cast(INSTANCE_CACHE.get(clazz));
}
T t = clazz.getDeclaredConstructor().newInstance();
INSTANCE_CACHE.put(clazz, t);
return t;
}
}3.8 线程池:IO 密集型优化
RPC 通信以 IO 为主,采用 IO 密集型线程池配置:
// ThreadPoolUtils.java
private static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
private static final int IO_INTENSIVE_NUM = CPU_NUM * 2;
public static ExecutorService createIoIntensiveThreadPool(String poolName) {
return createThreadPool(IO_INTENSIVE_NUM, poolName);
}四、项目结构
pig-rpc/
├── pom.xml # 父 POM,版本管理
├── rpc-core/ # 核心框架模块
│ ├── pom.xml # 依赖 Curator (ZK 客户端)
│ └── src/main/java/com/pig/rpc/
│ ├── constant/ # 常量定义(ZK 地址、端口)
│ ├── dto/ # RpcRequest / RpcResponse
│ ├── enums/ # RpcResponseStatus
│ ├── exception/ # RpcException
│ ├── factory/ # SingletonFactory
│ ├── handler/ # RpcRequestHandler(反射调用)
│ ├── loadbalance/ # 负载均衡接口 + 实现
│ ├── provider/ # 服务发布接口 + 实现
│ ├── proxy/ # RpcClientProxy(JDK 动态代理)
│ ├── registry/ # 服务注册/发现(ZK 实现)
│ ├── transmission/ # Socket 客户端/服务端
│ └── util/ # IPUtils、ThreadPoolUtils
├── test-api/ # 共享接口模块(服务端实现 + 客户端依赖)
│ └── src/main/java/com/pig/api/
│ ├── User.java # 传输对象
│ └── UserService.java # 服务接口
├── test-server/ # 服务端 Demo
│ └── src/main/java/com/pig/server/
│ ├── Main.java # 启动入口
│ └── service/UserServiceImpl.java # 服务实现
└── test-client/ # 客户端 Demo
└── src/main/java/com/pig/client/
├── Main.java # 调用入口
└── utils/ProxyUtils.java # 代理工具类五、服务端/客户端使用示例
服务端发布服务
// test-server/Main.java
public class Main {
public static void main(String[] args) {
// 1. 创建服务配置
RpcServiceConfig config = new RpcServiceConfig(new UserServiceImpl());
// 2. 创建并启动 RPC Server
RpcServer rpcServer = new SocketRpcServer();
rpcServer.publishService(config); // 发布到 ZK
rpcServer.start();
}
}客户端调用服务
// test-client/Main.java
public class Main {
public static void main(String[] args) {
// 1. 获取代理对象(完全透明,像本地调用一样)
UserService userService = ProxyUtils.getProxy(UserService.class);
// 2. 调用服务
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
User user = userService.getUser(1L);
System.out.println(user);
});
}
}
}六、关键技术细节
6.1 服务名设计:支持版本和分组
// rpcServiceName = interfaceName + version + group
// 例: com.pig.api.UserService + v1 + group1
// 支持:多版本共存、分组隔离6.2 响应校验:requestId 关联
private void check(RpcRequest rpcRequest, RpcResponse<?> rpcResponse) {
if (!Objects.equals(rpcRequest.getRequestId(), rpcResponse.getRequestId())) {
throw new RpcException("请求和响应的id不一致");
}
if (RpcResponseStatus.isFailed(rpcResponse.getCode())) {
throw new RpcException("响应值为失败: " + rpcResponse.getMsg());
}
}6.3 ZK 节点监听:缓存 + Watch
// ZkClient.java
private final Map<String, List<String>> SERVICE_ADDRESS_CACHE = new ConcurrentHashMap<>();
public List<String> getChildrenNode(String path) {
if (SERVICE_ADDRESS_CACHE.containsKey(path)) {
return SERVICE_ADDRESS_CACHE.get(path);
}
List<String> children = client.getChildren().forPath(path);
SERVICE_ADDRESS_CACHE.put(path, children);
watchNode(path); // 注册 Watcher,节点变更时自动刷新缓存
return children;
}七、Trade-offs 与局限性
八、技术栈
Further Reading
Dubbo 源码(生产级 RPC 框架参考)
评论区