Ray:LLM 时代的分布式计算底座
Ray:LLM 时代的分布式计算底座
如果你在 AI Infra 领域工作,Ray 几乎是绕不开的。vLLM 的分布式推理跑在 Ray 上,OpenAI 用 Ray 做训练数据处理,Anthropic 用它编排模型评估流水线,字节跳动用 Ray Serve 部署在线推理服务。
但很多人对 Ray 的认知停留在"分布式 Python"的层面。实际上,Ray 的设计思想远比这深刻——它本质上是一个通用的分布式计算运行时,通过极简的编程接口把单机代码无缝扩展到集群。
这篇文章从架构设计讲起,逐层拆解 Ray 的核心原语、AI Libraries 生态以及在 LLM 场景中的实际应用。
一、为什么需要 Ray
在 LLM 时代之前,分布式计算的主要工具是 Spark(批处理)、Flink(流处理)和 Kubernetes(容器编排)。它们各自解决一类问题,但 AI 工作负载的特点让这些工具都不太顺手:
| 特点 | 传统大数据 | AI 工作负载 |
|---|---|---|
| 计算模式 | MapReduce / 流 | 异构混合(CPU 预处理 + GPU 训练 + GPU 推理) |
| 状态管理 | 无状态或弱状态 | 强状态(模型权重、KV Cache、Actor 状态) |
| 资源需求 | CPU + 内存 | CPU + GPU + 高速互联 |
| 延迟要求 | 分钟到小时 | 毫秒到秒(推理) |
| 编程模型 | DSL(SQL/DataStream) | 通用 Python |
Ray 的定位是:用 Python 原生的方式编写分布式程序,让工程师不需要关心底层的序列化、调度、故障恢复。你只需要在函数上加一个 @ray.remote 装饰器,它就变成了一个可以在集群中任意节点上运行的分布式任务。
二、架构设计
2.1 整体架构
┌─────────────────────────────────────────────┐
│ AI Libraries │
│ Ray Train │ Ray Serve │ Ray Data │ Ray Tune │
├─────────────────────────────────────────────┤
│ Ray Core │
│ Task │ Actor │ Object │
├─────────────────────────────────────────────┤
│ Distributed Runtime │
│ GCS │ Raylet │ Scheduler │ Object Store │
└─────────────────────────────────────────────┘
三层架构:
- Distributed Runtime:底层运行时,负责集群管理、任务调度、对象存储
- Ray Core:三个编程原语——Task、Actor、Object
- AI Libraries:基于 Core 构建的上层库,覆盖训练、推理、数据处理、超参调优
2.2 关键组件
GCS(Global Control Store):集群的"大脑",存储所有元数据——节点信息、Actor 注册表、Job 状态。基于 Redis 或内置 KV 存储。
Raylet:每个节点上的守护进程,包含:
- 本地调度器:决定任务在本地执行还是转发给其他节点
- 对象管理器:管理节点上的共享内存对象存储(基于 Apache Arrow/Plasma)
Object Store:每个节点上的共享内存区域,用于零拷贝数据共享。大对象自动溢出到磁盘或分布式存储。
Node A Node B
┌────────────────┐ ┌────────────────┐
│ Worker 1 │ │ Worker 3 │
│ Worker 2 │ │ Worker 4 │
├────────────────┤ ├────────────────┤
│ Raylet │◄──gRPC──► │ Raylet │
│ ┌──────────┐ │ │ ┌──────────┐ │
│ │Obj Store │ │ │ │Obj Store │ │
│ └──────────┘ │ │ └──────────┘ │
└────────────────┘ └────────────────┘
│ │
└───────────┬──────────────────┘
▼
┌──────────┐
│ GCS │
└──────────┘
2.3 调度策略
Ray 的调度器采用两级调度:
- 本地调度优先:如果本地节点有足够资源,任务就在本地执行(减少数据传输)
- 全局调度兜底:本地资源不足时,Raylet 将任务转发给 GCS 进行全局调度
对于 GPU 任务,调度器会感知 num_gpus 资源需求,确保任务被调度到有空闲 GPU 的节点:
@ray.remote(num_gpus=1)
def train_on_gpu(data):
# 这个函数一定会被调度到有 GPU 的节点上
model = load_model()
return model.train(data)
三、Core 三原语
Ray Core 的设计哲学是用最少的概念覆盖最多的场景。三个原语——Task、Actor、Object——足以表达几乎所有分布式计算模式。
3.1 Task:无状态的远程函数
在普通 Python 函数上加 @ray.remote,它就变成了可以在集群中任意节点上异步执行的 Task。
import ray
ray.init()
@ray.remote
def process_chunk(data_chunk):
"""处理一个数据分片——在集群中某个节点上运行"""
result = heavy_computation(data_chunk)
return result
# 提交 100 个并行任务
chunks = split_data(raw_data, num_chunks=100)
futures = [process_chunk.remote(chunk) for chunk in chunks]
# 异步获取结果
results = ray.get(futures)
关键特性:
- 异步非阻塞:
.remote()立即返回ObjectRef(future),不等待执行完成 - 自动序列化:参数和返回值通过 Arrow 序列化,大对象走共享内存零拷贝
- 资源声明:通过
num_cpus、num_gpus、memory声明资源需求
指定资源和重试策略:
@ray.remote(num_cpus=2, num_gpus=0.5, max_retries=3)
def gpu_task(batch):
"""使用 2 个 CPU 核心和半块 GPU,失败自动重试 3 次"""
return inference(batch)
num_gpus=0.5 意味着一块 GPU 可以同时跑两个这样的任务——这在推理场景中很实用。
3.2 Actor:有状态的远程对象
Actor 是 Ray 最强大的原语。它把一个 Python class 变成集群中长期运行的有状态服务。
@ray.remote(num_gpus=1)
class ModelServer:
def __init__(self, model_path):
self.model = load_model(model_path)
self.request_count = 0
def predict(self, input_data):
self.request_count += 1
return self.model(input_data)
def get_stats(self):
return {"total_requests": self.request_count}
# 创建 Actor 实例——在某个有 GPU 的节点上启动
server = ModelServer.remote("/models/llama-7b")
# 调用方法——发送消息给 Actor
result_ref = server.predict.remote(input_data)
result = ray.get(result_ref)
# 状态在调用之间保持
stats = ray.get(server.get_stats.remote())
# {"total_requests": 1}
Actor 与 Task 的本质区别:
- Task 是无状态的——每次调用独立,函数执行完进程就结束
- Actor 是有状态的——实例持续存活,状态在方法调用间保持
这让 Actor 天然适合:
- 模型推理服务(模型权重常驻 GPU 显存)
- 分布式训练中的 Parameter Server
- 流处理中的有状态算子
3.3 Object:分布式对象存储
Ray 的第三个原语是 Object——存储在分布式共享内存中的不可变数据。
# 显式放入 Object Store
large_dataset = load_dataset() # 假设 10GB
dataset_ref = ray.put(large_dataset)
# 多个 Task 共享同一份数据——零拷贝
@ray.remote
def process(data_ref, partition_id):
data = ray.get(data_ref) # 本地节点直接从共享内存读取
return analyze(data, partition_id)
# 10 个 Task 共享同一个 10GB 的数据对象
futures = [process.remote(dataset_ref, i) for i in range(10)]
关键机制:
- 不可变性:Object 创建后不可修改,避免了分布式一致性问题
- 引用计数:无人引用时自动 GC
- 溢出机制:共享内存不足时自动溢出到磁盘
- 跨节点传输:需要时通过 Raylet 间的 gRPC 自动拉取远端对象
三原语的组合能力
三个简单的原语组合起来能表达复杂的分布式模式:
# 模式:多个有状态 Actor 并行处理,结果汇总
@ray.remote
class Worker:
def __init__(self, worker_id):
self.id = worker_id
self.processed = 0
def process(self, item):
result = expensive_computation(item)
self.processed += 1
return result
# 创建 Worker 池
workers = [Worker.remote(i) for i in range(4)]
# 分发任务(Round-Robin)
futures = []
for i, item in enumerate(work_items):
worker = workers[i % len(workers)]
futures.append(worker.process.remote(item))
# 收集所有结果
results = ray.get(futures)
四、AI Libraries 生态
Ray Core 提供底层原语,上层的 AI Libraries 针对具体场景封装了最佳实践。
4.1 Ray Train:分布式训练
封装了 PyTorch DDP、DeepSpeed、FSDP 等分布式训练策略,用统一的 API 屏蔽底层差异。
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
"""每个 Worker 执行的训练逻辑"""
import torch
from ray.train import get_context
model = build_model()
# Ray 自动做 DDP 包装
model = ray.train.torch.prepare_model(model)
dataloader = build_dataloader()
dataloader = ray.train.torch.prepare_data_loader(dataloader)
for epoch in range(10):
for batch in dataloader:
loss = train_step(model, batch)
# 报告指标 + 保存 checkpoint
ray.train.report(
metrics={"loss": loss.item(), "epoch": epoch},
checkpoint=ray.train.Checkpoint.from_dict(
{"model_state_dict": model.state_dict()}
),
)
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4, # 4 个训练 Worker
use_gpu=True, # 每个 Worker 使用 1 块 GPU
resources_per_worker={"CPU": 4, "GPU": 1},
),
)
result = trainer.fit()
print(f"Best loss: {result.metrics['loss']}")
Ray Train 的价值在于:
- 不改训练代码就能从单卡扩展到多卡多机
- 内置 checkpoint 管理和故障恢复
- 与 Ray Tune 无缝集成做超参搜索
4.2 Ray Serve:在线推理
Ray Serve 是 Ray 生态中与 LLM 关联最紧密的组件。vLLM 的多卡推理底层就依赖 Ray Serve 做 Actor 编排。
from ray import serve
import ray
@serve.deployment(
num_replicas=2, # 2 个副本
ray_actor_options={"num_gpus": 1}, # 每个副本 1 块 GPU
max_ongoing_requests=32, # 最大并发请求数
)
class LLMService:
def __init__(self):
from vllm import LLM
self.model = LLM(
model="meta-llama/Llama-3-8B-Instruct",
tensor_parallel_size=1,
)
async def __call__(self, request):
prompt = (await request.json())["prompt"]
outputs = self.model.generate([prompt])
return {"text": outputs[0].outputs[0].text}
app = LLMService.bind()
serve.run(app, host="0.0.0.0", port=8000)
Ray Serve 的核心能力:
- 自动伸缩:根据请求队列长度自动调整副本数
- 多模型组合:通过 DeploymentGraph 组合多个模型形成推理 Pipeline
- 流式响应:原生支持 SSE,适配 LLM 的 token-by-token 生成
- 零停机更新:滚动更新模型版本,不中断服务
4.3 Ray Data:分布式数据处理
面向 ML 工作负载的数据处理库,可以看作"ML 领域的 Spark",但与 GPU 训练/推理流水线无缝衔接。
import ray
# 读取数据
ds = ray.data.read_parquet("s3://bucket/training_data/")
# 分布式预处理
ds = ds.map(tokenize, num_gpus=0) # CPU 上做 tokenize
ds = ds.filter(lambda x: len(x["tokens"]) <= 4096) # 过滤超长序列
ds = ds.random_shuffle() # 全局随机打乱
# 直接喂给 Ray Train——零拷贝
trainer = TorchTrainer(
train_func,
datasets={"train": ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
与 Spark 的区别:Ray Data 原生支持 GPU 算子(比如在 GPU 上做数据增强),且与 Train/Serve 共享同一个集群,不需要跨系统传数据。
4.4 Ray Tune:超参调优
from ray import tune
from ray.tune.schedulers import ASHAScheduler
search_space = {
"lr": tune.loguniform(1e-5, 1e-3),
"batch_size": tune.choice([16, 32, 64]),
"num_layers": tune.randint(2, 8),
}
tuner = tune.Tuner(
train_func,
param_space=search_space,
tune_config=tune.TuneConfig(
metric="loss",
mode="min",
num_samples=50, # 尝试 50 组超参
scheduler=ASHAScheduler(), # 早停表现差的试验
),
run_config=tune.RunConfig(
storage_path="s3://bucket/ray_results",
),
)
results = tuner.fit()
best = results.get_best_result()
print(f"Best config: {best.config}")
ASHA 调度器的作用:在训练早期就终止表现明显差的超参组合,把 GPU 资源让给有前途的试验。50 组超参实际可能只需要跑完其中 10-15 组的全部 epoch。
五、Ray 在 LLM 时代的定位
Ray 在 LLM 技术栈中不是某个环节的工具,而是贯穿训练、推理、数据处理全链路的底层运行时。
LLM 全链路:
↓ Ray Data
训练数据清洗 → 预处理 → 分布式训练 → 模型评估 → 在线推理
↓ Ray Train ↓ Ray Tune ↓ Ray Serve
5.1 vLLM + Ray:分布式推理的标准组合
vLLM 使用 Ray Actor 实现 Tensor Parallelism——模型的不同层分布在不同 GPU 上,通过 NCCL 通信。
# vLLM 多卡推理——底层自动创建 Ray Actor
from vllm import LLM
model = LLM(
model="meta-llama/Llama-3-70B-Instruct",
tensor_parallel_size=4, # 4 卡 Tensor Parallel
# vLLM 内部会创建 4 个 Ray Actor,每个绑定 1 块 GPU
)
当你设置 tensor_parallel_size > 1 时,vLLM 内部会:
- 启动 Ray 集群(如果还没启动)
- 创建 N 个 GPU Worker Actor
- 把模型权重分片加载到各 Actor
- 推理时通过 NCCL AllReduce 同步中间结果
5.2 训练数据处理
LLM 预训练数据处理的典型流水线:
import ray
# 处理 TB 级别的文本数据
ds = ray.data.read_json("s3://corpus/raw/")
# 1. 语言检测和过滤
ds = ds.map_batches(detect_language, batch_size=1000)
ds = ds.filter(lambda x: x["lang"] == "zh" and x["score"] > 0.8)
# 2. 去重(MinHash LSH)
ds = ds.map_batches(compute_minhash, batch_size=500)
ds = deduplicate(ds, threshold=0.8)
# 3. 质量过滤
ds = ds.map_batches(quality_score, batch_size=500, num_gpus=0.25)
ds = ds.filter(lambda x: x["quality"] > 0.6)
# 4. Tokenize 并写出
ds = ds.map_batches(tokenize, batch_size=256)
ds.write_parquet("s3://corpus/processed/")
这个流水线处理 TB 级数据时,Ray Data 会自动在集群中分配任务、管理 backpressure、处理 straggler。比自己用 multiprocessing + S3 拼凑可靠得多。
六、生产实践经验
6.1 集群部署
在 Kubernetes 上部署 Ray 集群推荐使用 KubeRay Operator:
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: llm-cluster
spec:
headGroupSpec:
rayStartParams:
dashboard-host: "0.0.0.0"
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-ml:2.41.0-gpu
resources:
limits:
cpu: "8"
memory: "32Gi"
workerGroupSpecs:
- replicas: 4
groupName: gpu-workers
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-ml:2.41.0-gpu
resources:
limits:
cpu: "16"
memory: "64Gi"
nvidia.com/gpu: "1"
6.2 常见陷阱
对象传递大小——不要在 .remote() 参数里直接传大对象:
# 错误:每次调用都序列化 100MB 数据
large_data = load_data() # 100MB
futures = [process.remote(large_data, i) for i in range(100)]
# → 序列化 100 次 = 10GB 网络传输
# 正确:先 put 到 Object Store,传引用
data_ref = ray.put(large_data) # 只序列化 1 次
futures = [process.remote(data_ref, i) for i in range(100)]
# → 100 次传的只是 ObjectRef(几十字节)
Actor 死锁——避免 Actor 之间同步等待:
# 危险:Actor A 等 Actor B,B 又等 A
@ray.remote
class A:
def __init__(self, b):
self.b = b
def method(self):
return ray.get(self.b.method.remote()) # 同步等待 B
# 安全:使用异步方法
@ray.remote
class A:
def __init__(self, b):
self.b = b
async def method(self):
return await self.b.method.remote() # 异步等待,不阻塞
资源声明不准确——GPU 声明不对会导致 OOM 或资源浪费:
# 推理任务实际只需要半块 GPU 显存
@ray.remote(num_gpus=0.5)
def inference(batch):
...
# 训练任务需要整块 GPU
@ray.remote(num_gpus=1)
def train(data):
...
6.3 监控与调试
Ray Dashboard(默认 8265 端口)提供集群全景:
- 节点状态、CPU/GPU/内存利用率
- Actor 列表、状态、日志
- Job 进度和资源消耗
- Object Store 使用情况
生产环境建议接入 Prometheus + Grafana:
# Ray 内置 Prometheus metrics 端点
# 启动时开启:ray start --metrics-export-port=8080
#
# 关键指标:
# ray_node_cpu_utilization
# ray_node_gpu_utilization
# ray_node_mem_used
# ray_actors_total
# ray_tasks_running
6.4 性能调优清单
| 场景 | 优化策略 |
|---|---|
| 大数据传输 | ray.put() + ObjectRef,避免重复序列化 |
| 高并发推理 | Actor 多副本 + max_concurrency |
| 批处理任务 | 合理设置 batch_size,减少调度开销 |
| GPU 利用率低 | 用 fractional GPU(num_gpus=0.5)提升 GPU 复用 |
| 长尾任务 | ray.wait() 替代 ray.get() 实现渐进式处理 |
| 内存溢出 | 配置 object_spilling 到磁盘/S3 |
# ray.wait 渐进式处理——不必等最慢的任务
remaining = [task.remote(i) for i in range(100)]
results = []
while remaining:
done, remaining = ray.wait(remaining, num_returns=1)
results.extend(ray.get(done))
# 每完成一个就处理一个,不被 straggler 阻塞
七、总结
Ray 的核心价值是把分布式系统的复杂性封装在三个简单原语背后。你写的是普通 Python,Ray 处理序列化、调度、故障恢复、资源管理。
对于 AI 工程师来说,Ray 的掌握优先级:
- 必须掌握:Task、Actor、Object 三原语,
ray.put()/ray.get()/ray.wait()的正确使用 - 按需掌握:Ray Serve(做推理服务时)、Ray Train(做分布式训练时)、Ray Data(做数据处理时)
- 了解即可:底层调度策略、GCS 架构、Object Store 实现细节
在 LLM 时代,Ray 已经是事实上的 AI Infra 底座。掌握它不只是多会一个框架,而是理解了 AI 系统从训练到推理的分布式运行方式。