侧边栏壁纸
博主头像
PPP的日记

行动起来,活在当下

  • 累计撰写 13 篇文章
  • 累计创建 14 个标签
  • 累计收到 23 条评论

目 录CONTENT

文章目录

从零实现 RPC 框架:pig-rpc 的架构设计与实现

TL;DR: 本文介绍一个手写的 Java RPC 框架 pig-rpc,基于 Socket 通信、Java 动态代理、ZooKeeper 服务注册与发现,实现了一个完整的分布式服务调用框架。包含服务发布/订阅、负载均衡(随机策略)、服务缓存、单例工厂、线程池管理等核心组件,代码结构清晰,适合作为理解 RPC 原理的入门项目。


一、为什么需要 RPC

在分布式系统中,服务通常部署在不同的机器上。一个简单的 HTTP 调用在跨进程场景下会产生大量开销,而 RPC(Remote Procedure Call)通过自定义通信协议和序列化机制,能够比 HTTP 更快、更灵活地实现跨进程调用。

本文介绍的 pig-rpc 是一个教学级别的 RPC 框架实现,涵盖了 RPC 的核心要素,不依赖 Spring 等框架,适合理解 RPC 底层原理。

本项目面向具备 Java 基础和分布式系统基本概念的开发者在阅读前建议熟悉:接口与实现分离、单例模式、线程池、Socket 通信基础。


二、系统架构

                               ┌─────────────────────────────────┐
                               │           Client                │
                               │                                 │
                               │  ProxyUtils.getProxy(UserService.class)
                               │              ↓                  │
                               │  ┌─────────────────────────┐  │
                               │  │   RpcClientProxy         │  │
                               │  │   (JDK Dynamic Proxy)    │  │
                               │  └────────────┬──────────────┘  │
                               │               ↓                 │
                               │  ┌─────────────────────────┐  │
                               │  │    SocketRpcClient      │  │
                               │  │  (ServiceDiscovery lookup)│  │
                               │  └────────────┬──────────────┘  │
                               └────────────────┼────────────────┘
                                                │
                     ┌──────────────────────────┼──────────────────────────┐
                     │                          ↓                          │
                     │         ┌──────────────────────────────┐            │
                     │         │    ZooKeeper (注册中心)       │           │
                     │         │/pig-rpc/com.pig.api.UserService│          │
                     │         │    /192.168.79.1:8888        │            │
                     │         └──────────────────────────────┘            │
                     │                          ↑                           │
                     └──────────────────────────┼──────────────────────────┘
                                                │
                               ┌────────────────┼────────────────┐
                               │                ↓                │
                               │  ┌──────────────────────────┐   │
                               │  │     SocketRpcServer      │   │
                               │  │  (ServerSocket.accept)   │   │
                               │  └────────────┬─────────────┘   │
                               │               ↓                 │
                               │  ┌──────────────────────────┐   │
                               │  │    SocketReqHandler      │   │
                               │  │    (per-connection)      │   │
                               │  └────────────┬─────────────┘   │
                               │               ↓                 │
                               │  ┌──────────────────────────┐   │
                               │  │    RpcRequestHandler     │   │
                               │  │  (reflection invoke)     │   │
                               │  └────────────┬─────────────┘   │
                               │               ↓                 │
                               │  ┌──────────────────────────┐   │
                               │  │   UserServiceImpl        │   │
                               │  │   (业务实现)              │   │
                               │  └──────────────────────────┘   │
                               │                                 │
                               │           Server                │
                               └──────────────────────────────────┘

三、核心模块实现

3.1 通信层:Socket 传输

客户端使用原生 Socket 发起请求,服务端使用 ServerSocket 接收连接:

 // SocketRpcClient.java
 public RpcResponse<?> sendReq(RpcRequest rpcRequest) {
     // 1. 通过 ZooKeeper 发现服务地址
     InetSocketAddress address = serviceDiscovery.lookupService(rpcRequest);
 ​
     try (Socket socket = new Socket(address.getAddress(), address.getPort())) {
         // 2. 序列化请求并发送
         ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
         outputStream.writeObject(rpcRequest);
         outputStream.flush();
 ​
         // 3. 读取响应
         ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
         Object o = inputStream.readObject();
 ​
         return (RpcResponse<?>) o;
     } catch (Exception e) {
         throw new RuntimeException(e);
     }
 }

服务端为每个连接分配一个线程处理:

 // SocketRpcServer.java
 public void start() {
     ShutdownHookUtils.clearAll();
 ​
     try (ServerSocket serverSocket = new ServerSocket(port)) {
         log.info("服务启动, 端口: {}", port);
 ​
         Socket socket;
         while ((socket = serverSocket.accept()) != null) {
             // 每个连接一个线程处理
             executor.submit(new SocketReqHandler(socket, rpcRequestHandler));
         }
     }
 }

3.2 请求与响应:DTO 设计

 // RpcRequest.java
 @Data
 public class RpcRequest implements Serializable {
     private String requestId;      // 请求唯一 ID,用于关联响应
     private String interfaceName;  // 接口名
     private String methodName;     // 方法名
     private Object[] params;       // 参数列表
     private Class<?>[] paramTypes;  // 参数类型
     private String version;        // 版本号(支持多版本)
     private String group;          // 分组(支持分组隔离)
 ​
     // rpcServiceName = interfaceName + version + group
     public String rpcServiceName() {
         return getInterfaceName()
             + StrUtil.blankToDefault(getVersion(), StrUtil.EMPTY)
             + StrUtil.blankToDefault(getGroup(), StrUtil.EMPTY);
     }
 }
 // RpcResponse.java
 @Data
 public class RpcResponse<T> implements Serializable {
     private String requestId;   // 关联请求 ID
     private Integer code;       // 状态码:0=成功,9999=失败
     private String msg;        // 错误信息
     private T data;            // 响应数据
 ​
     public static <T> RpcResponse<T> success(String requestId, T data) {
         RpcResponse<T> rpcResponse = new RpcResponse<T>();
         rpcResponse.setRequestId(requestId);
         rpcResponse.setCode(RpcResponseStatus.SUCCESS.getCode());
         rpcResponse.setData(data);
         return rpcResponse;
     }
 }

3.3 动态代理:客户端调用透明化

RPC 客户端不需要知道底层 Socket 细节,通过 JDK 动态代理实现透明调用:

 // RpcClientProxy.java
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
     // 1. 构建 RPC 请求
     RpcRequest rpcRequest = RpcRequest.builder()
         .requestId(IdUtil.fastSimpleUUID())
         .interfaceName(method.getDeclaringClass().getCanonicalName())
         .methodName(method.getName())
         .params(args)
         .paramTypes(method.getParameterTypes())
         .version(config.getVersion())
         .group(config.getGroup())
         .build();
 ​
     // 2. 发送请求(Socket 通信)
     RpcResponse<?> rpcResponse = rpcClient.sendReq(rpcRequest);
 ​
     // 3. 校验响应
     check(rpcRequest, rpcResponse);
 ​
     // 4. 返回结果(对调用方透明)
     return rpcResponse.getData();
 }

用户调用方式完全透明:

 // Client
 UserService userService = ProxyUtils.getProxy(UserService.class);
 User user = userService.getUser(1L);  // 看起来像本地调用,实际是 RPC

3.4 服务端请求处理:反射调用

 // RpcRequestHandler.java
 public Object invoke(RpcRequest rpcRequest) {
     String rpcServiceName = rpcRequest.rpcServiceName();
 ​
     // 1. 获取服务实例
     Object service = serviceProvider.getService(rpcServiceName);
 ​
     // 2. 通过反射调用方法
     Method method = service.getClass().getMethod(
         rpcRequest.getMethodName(),
         rpcRequest.getParamTypes()
     );
 ​
     return method.invoke(service, rpcRequest.getParams());
 }

3.5 服务注册与发现:ZooKeeper

服务端发布服务时,将地址注册到 ZooKeeper:

 // ZkServiceRegistry.java
 public void registerService(String rpcServiceName, InetSocketAddress address) {
     // /pig-rpc/com.pig.api.UserService/192.168.79.1:8888
     String path = RpcConstant.ZK_RPC_ROOT_PATH
         + StrUtil.SLASH
         + rpcServiceName
         + StrUtil.SLASH
         + IPUtils.toIpPort(address);
 ​
     zkClient.createPersistentNode(path);
 }

客户端发现服务时,从 ZooKeeper 读取可用地址列表:

 // ZkServiceDiscovery.java
 public InetSocketAddress lookupService(RpcRequest rpcRequest) {
     String path = RpcConstant.ZK_RPC_ROOT_PATH + StrUtil.SLASH + rpcRequest.rpcServiceName();
 ​
     // 从 ZK 获取该服务的所有可用地址
     List<String> children = zkClient.getChildrenNode(path);
 ​
     // 负载均衡选择一个
     String address = loadBalance.select(children);
 ​
     return IPUtils.toInetSocketAddress(address);
 }

3.6 负载均衡:随机策略

 // RandomLoadBalance.java
 public String select(List<String> list) {
     return RandomUtil.randomEle(list);
 }

3.7 单例工厂:组件管理

避免每次创建新的 ZooKeeper 连接等重量级对象:

 // SingletonFactory.java
 public static <T> T getInstance(Class<T> clazz) {
     if (INSTANCE_CACHE.containsKey(clazz)) {
         return clazz.cast(INSTANCE_CACHE.get(clazz));
     }
 ​
     synchronized (SingletonFactory.class) {
         if (INSTANCE_CACHE.containsKey(clazz)) {
             return clazz.cast(INSTANCE_CACHE.get(clazz));
         }
 ​
         T t = clazz.getDeclaredConstructor().newInstance();
         INSTANCE_CACHE.put(clazz, t);
         return t;
     }
 }

3.8 线程池:IO 密集型优化

RPC 通信以 IO 为主,采用 IO 密集型线程池配置:

 // ThreadPoolUtils.java
 private static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
 private static final int IO_INTENSIVE_NUM = CPU_NUM * 2;
 ​
 public static ExecutorService createIoIntensiveThreadPool(String poolName) {
     return createThreadPool(IO_INTENSIVE_NUM, poolName);
 }

四、项目结构

 pig-rpc/
 ├── pom.xml                    # 父 POM,版本管理
 ├── rpc-core/                  # 核心框架模块
 │   ├── pom.xml               # 依赖 Curator (ZK 客户端)
 │   └── src/main/java/com/pig/rpc/
 │       ├── constant/         # 常量定义(ZK 地址、端口)
 │       ├── dto/              # RpcRequest / RpcResponse
 │       ├── enums/            # RpcResponseStatus
 │       ├── exception/        # RpcException
 │       ├── factory/           # SingletonFactory
 │       ├── handler/           # RpcRequestHandler(反射调用)
 │       ├── loadbalance/       # 负载均衡接口 + 实现
 │       ├── provider/          # 服务发布接口 + 实现
 │       ├── proxy/             # RpcClientProxy(JDK 动态代理)
 │       ├── registry/          # 服务注册/发现(ZK 实现)
 │       ├── transmission/      # Socket 客户端/服务端
 │       └── util/              # IPUtils、ThreadPoolUtils
 ├── test-api/                  # 共享接口模块(服务端实现 + 客户端依赖)
 │   └── src/main/java/com/pig/api/
 │       ├── User.java         # 传输对象
 │       └── UserService.java  # 服务接口
 ├── test-server/               # 服务端 Demo
 │   └── src/main/java/com/pig/server/
 │       ├── Main.java         # 启动入口
 │       └── service/UserServiceImpl.java  # 服务实现
 └── test-client/               # 客户端 Demo
     └── src/main/java/com/pig/client/
         ├── Main.java         # 调用入口
         └── utils/ProxyUtils.java  # 代理工具类

五、服务端/客户端使用示例

服务端发布服务

 // test-server/Main.java
 public class Main {
     public static void main(String[] args) {
         // 1. 创建服务配置
         RpcServiceConfig config = new RpcServiceConfig(new UserServiceImpl());
 ​
         // 2. 创建并启动 RPC Server
         RpcServer rpcServer = new SocketRpcServer();
         rpcServer.publishService(config);  // 发布到 ZK
         rpcServer.start();
     }
 }

客户端调用服务

 // test-client/Main.java
 public class Main {
     public static void main(String[] args) {
         // 1. 获取代理对象(完全透明,像本地调用一样)
         UserService userService = ProxyUtils.getProxy(UserService.class);
 ​
         // 2. 调用服务
         ExecutorService executorService = Executors.newFixedThreadPool(10);
         for (int i = 0; i < 10; i++) {
             executorService.execute(() -> {
                 User user = userService.getUser(1L);
                 System.out.println(user);
             });
         }
     }
 }

六、关键技术细节

6.1 服务名设计:支持版本和分组

 // rpcServiceName = interfaceName + version + group
 // 例: com.pig.api.UserService + v1 + group1
 // 支持:多版本共存、分组隔离

6.2 响应校验:requestId 关联

 private void check(RpcRequest rpcRequest, RpcResponse<?> rpcResponse) {
     if (!Objects.equals(rpcRequest.getRequestId(), rpcResponse.getRequestId())) {
         throw new RpcException("请求和响应的id不一致");
     }
     if (RpcResponseStatus.isFailed(rpcResponse.getCode())) {
         throw new RpcException("响应值为失败: " + rpcResponse.getMsg());
     }
 }

6.3 ZK 节点监听:缓存 + Watch

 // ZkClient.java
 private final Map<String, List<String>> SERVICE_ADDRESS_CACHE = new ConcurrentHashMap<>();
 ​
 public List<String> getChildrenNode(String path) {
     if (SERVICE_ADDRESS_CACHE.containsKey(path)) {
         return SERVICE_ADDRESS_CACHE.get(path);
     }
 ​
     List<String> children = client.getChildren().forPath(path);
     SERVICE_ADDRESS_CACHE.put(path, children);
 ​
     watchNode(path);  // 注册 Watcher,节点变更时自动刷新缓存
     return children;
 }

七、Trade-offs 与局限性

决策

Trade-off

改进方向

Socket 通信

实现简单,但性能不如 Netty

可替换为 Netty 传输层

随机负载均衡

实现简单,但不保证均匀

可添加 RoundRobin、Weighted 等策略

同步阻塞调用

实现简单,不支持异步

可添加 Future/RxJava 异步支持

临时节点 vs 永久节点

PERSISTENT 节点保证可靠性,但故障时需手动清理

可改用临时节点 + 心跳

无连接池

每次请求新建连接,开销大

可添加 Socket 连接池复用

Java 序列化

兼容性最好,但性能一般

可替换为 Hessian/Protobuf/Kryo


八、技术栈

组件

技术选型

用途

通信

原生 Socket + ObjectInputStream

跨进程调用

代理

JDK Dynamic Proxy

透明化 RPC 调用

注册中心

Apache Curator (ZooKeeper)

服务注册与发现

序列化

Java 内置序列化

对象传输

工具库

Hutool

字符串、UUID 等工具

日志

Logback + Slf4j

日志输出

构建

Maven (Java 1.8+)

依赖管理


Further Reading

0

评论区