返回博客列表

Ray:LLM 时代的分布式计算底座

·AI

Ray:LLM 时代的分布式计算底座

如果你在 AI Infra 领域工作,Ray 几乎是绕不开的。vLLM 的分布式推理跑在 Ray 上,OpenAI 用 Ray 做训练数据处理,Anthropic 用它编排模型评估流水线,字节跳动用 Ray Serve 部署在线推理服务。

但很多人对 Ray 的认知停留在"分布式 Python"的层面。实际上,Ray 的设计思想远比这深刻——它本质上是一个通用的分布式计算运行时,通过极简的编程接口把单机代码无缝扩展到集群。

这篇文章从架构设计讲起,逐层拆解 Ray 的核心原语、AI Libraries 生态以及在 LLM 场景中的实际应用。


一、为什么需要 Ray

在 LLM 时代之前,分布式计算的主要工具是 Spark(批处理)、Flink(流处理)和 Kubernetes(容器编排)。它们各自解决一类问题,但 AI 工作负载的特点让这些工具都不太顺手:

特点 传统大数据 AI 工作负载
计算模式 MapReduce / 流 异构混合(CPU 预处理 + GPU 训练 + GPU 推理)
状态管理 无状态或弱状态 强状态(模型权重、KV Cache、Actor 状态)
资源需求 CPU + 内存 CPU + GPU + 高速互联
延迟要求 分钟到小时 毫秒到秒(推理)
编程模型 DSL(SQL/DataStream) 通用 Python

Ray 的定位是:用 Python 原生的方式编写分布式程序,让工程师不需要关心底层的序列化、调度、故障恢复。你只需要在函数上加一个 @ray.remote 装饰器,它就变成了一个可以在集群中任意节点上运行的分布式任务。


二、架构设计

2.1 整体架构

┌─────────────────────────────────────────────┐
│               AI Libraries                   │
│  Ray Train │ Ray Serve │ Ray Data │ Ray Tune │
├─────────────────────────────────────────────┤
│               Ray Core                       │
│      Task    │    Actor    │    Object       │
├─────────────────────────────────────────────┤
│          Distributed Runtime                 │
│  GCS │ Raylet │ Scheduler │ Object Store    │
└─────────────────────────────────────────────┘

三层架构:

  • Distributed Runtime:底层运行时,负责集群管理、任务调度、对象存储
  • Ray Core:三个编程原语——Task、Actor、Object
  • AI Libraries:基于 Core 构建的上层库,覆盖训练、推理、数据处理、超参调优

2.2 关键组件

GCS(Global Control Store):集群的"大脑",存储所有元数据——节点信息、Actor 注册表、Job 状态。基于 Redis 或内置 KV 存储。

Raylet:每个节点上的守护进程,包含:

  • 本地调度器:决定任务在本地执行还是转发给其他节点
  • 对象管理器:管理节点上的共享内存对象存储(基于 Apache Arrow/Plasma)

Object Store:每个节点上的共享内存区域,用于零拷贝数据共享。大对象自动溢出到磁盘或分布式存储。

Node A                          Node B
┌────────────────┐             ┌────────────────┐
│  Worker 1      │             │  Worker 3      │
│  Worker 2      │             │  Worker 4      │
├────────────────┤             ├────────────────┤
│    Raylet      │◄──gRPC──►  │    Raylet      │
│  ┌──────────┐  │             │  ┌──────────┐  │
│  │Obj Store │  │             │  │Obj Store │  │
│  └──────────┘  │             │  └──────────┘  │
└────────────────┘             └────────────────┘
         │                              │
         └───────────┬──────────────────┘
                     ▼
              ┌──────────┐
              │   GCS    │
              └──────────┘

2.3 调度策略

Ray 的调度器采用两级调度:

  1. 本地调度优先:如果本地节点有足够资源,任务就在本地执行(减少数据传输)
  2. 全局调度兜底:本地资源不足时,Raylet 将任务转发给 GCS 进行全局调度

对于 GPU 任务,调度器会感知 num_gpus 资源需求,确保任务被调度到有空闲 GPU 的节点:

@ray.remote(num_gpus=1)
def train_on_gpu(data):
    # 这个函数一定会被调度到有 GPU 的节点上
    model = load_model()
    return model.train(data)

三、Core 三原语

Ray Core 的设计哲学是用最少的概念覆盖最多的场景。三个原语——Task、Actor、Object——足以表达几乎所有分布式计算模式。

3.1 Task:无状态的远程函数

在普通 Python 函数上加 @ray.remote,它就变成了可以在集群中任意节点上异步执行的 Task。

import ray

ray.init()

@ray.remote
def process_chunk(data_chunk):
    """处理一个数据分片——在集群中某个节点上运行"""
    result = heavy_computation(data_chunk)
    return result

# 提交 100 个并行任务
chunks = split_data(raw_data, num_chunks=100)
futures = [process_chunk.remote(chunk) for chunk in chunks]

# 异步获取结果
results = ray.get(futures)

关键特性:

  • 异步非阻塞.remote() 立即返回 ObjectRef(future),不等待执行完成
  • 自动序列化:参数和返回值通过 Arrow 序列化,大对象走共享内存零拷贝
  • 资源声明:通过 num_cpusnum_gpusmemory 声明资源需求

指定资源和重试策略:

@ray.remote(num_cpus=2, num_gpus=0.5, max_retries=3)
def gpu_task(batch):
    """使用 2 个 CPU 核心和半块 GPU,失败自动重试 3 次"""
    return inference(batch)

num_gpus=0.5 意味着一块 GPU 可以同时跑两个这样的任务——这在推理场景中很实用。

3.2 Actor:有状态的远程对象

Actor 是 Ray 最强大的原语。它把一个 Python class 变成集群中长期运行的有状态服务。

@ray.remote(num_gpus=1)
class ModelServer:
    def __init__(self, model_path):
        self.model = load_model(model_path)
        self.request_count = 0

    def predict(self, input_data):
        self.request_count += 1
        return self.model(input_data)

    def get_stats(self):
        return {"total_requests": self.request_count}

# 创建 Actor 实例——在某个有 GPU 的节点上启动
server = ModelServer.remote("/models/llama-7b")

# 调用方法——发送消息给 Actor
result_ref = server.predict.remote(input_data)
result = ray.get(result_ref)

# 状态在调用之间保持
stats = ray.get(server.get_stats.remote())
# {"total_requests": 1}

Actor 与 Task 的本质区别:

  • Task 是无状态的——每次调用独立,函数执行完进程就结束
  • Actor 是有状态的——实例持续存活,状态在方法调用间保持

这让 Actor 天然适合:

  • 模型推理服务(模型权重常驻 GPU 显存)
  • 分布式训练中的 Parameter Server
  • 流处理中的有状态算子

3.3 Object:分布式对象存储

Ray 的第三个原语是 Object——存储在分布式共享内存中的不可变数据。

# 显式放入 Object Store
large_dataset = load_dataset()  # 假设 10GB
dataset_ref = ray.put(large_dataset)

# 多个 Task 共享同一份数据——零拷贝
@ray.remote
def process(data_ref, partition_id):
    data = ray.get(data_ref)  # 本地节点直接从共享内存读取
    return analyze(data, partition_id)

# 10 个 Task 共享同一个 10GB 的数据对象
futures = [process.remote(dataset_ref, i) for i in range(10)]

关键机制:

  • 不可变性:Object 创建后不可修改,避免了分布式一致性问题
  • 引用计数:无人引用时自动 GC
  • 溢出机制:共享内存不足时自动溢出到磁盘
  • 跨节点传输:需要时通过 Raylet 间的 gRPC 自动拉取远端对象

三原语的组合能力

三个简单的原语组合起来能表达复杂的分布式模式:

# 模式:多个有状态 Actor 并行处理,结果汇总
@ray.remote
class Worker:
    def __init__(self, worker_id):
        self.id = worker_id
        self.processed = 0

    def process(self, item):
        result = expensive_computation(item)
        self.processed += 1
        return result

# 创建 Worker 池
workers = [Worker.remote(i) for i in range(4)]

# 分发任务(Round-Robin)
futures = []
for i, item in enumerate(work_items):
    worker = workers[i % len(workers)]
    futures.append(worker.process.remote(item))

# 收集所有结果
results = ray.get(futures)

四、AI Libraries 生态

Ray Core 提供底层原语,上层的 AI Libraries 针对具体场景封装了最佳实践。

4.1 Ray Train:分布式训练

封装了 PyTorch DDP、DeepSpeed、FSDP 等分布式训练策略,用统一的 API 屏蔽底层差异。

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    """每个 Worker 执行的训练逻辑"""
    import torch
    from ray.train import get_context

    model = build_model()
    # Ray 自动做 DDP 包装
    model = ray.train.torch.prepare_model(model)

    dataloader = build_dataloader()
    dataloader = ray.train.torch.prepare_data_loader(dataloader)

    for epoch in range(10):
        for batch in dataloader:
            loss = train_step(model, batch)

        # 报告指标 + 保存 checkpoint
        ray.train.report(
            metrics={"loss": loss.item(), "epoch": epoch},
            checkpoint=ray.train.Checkpoint.from_dict(
                {"model_state_dict": model.state_dict()}
            ),
        )

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(
        num_workers=4,           # 4 个训练 Worker
        use_gpu=True,            # 每个 Worker 使用 1 块 GPU
        resources_per_worker={"CPU": 4, "GPU": 1},
    ),
)
result = trainer.fit()
print(f"Best loss: {result.metrics['loss']}")

Ray Train 的价值在于:

  • 不改训练代码就能从单卡扩展到多卡多机
  • 内置 checkpoint 管理和故障恢复
  • 与 Ray Tune 无缝集成做超参搜索

4.2 Ray Serve:在线推理

Ray Serve 是 Ray 生态中与 LLM 关联最紧密的组件。vLLM 的多卡推理底层就依赖 Ray Serve 做 Actor 编排。

from ray import serve
import ray

@serve.deployment(
    num_replicas=2,                        # 2 个副本
    ray_actor_options={"num_gpus": 1},     # 每个副本 1 块 GPU
    max_ongoing_requests=32,               # 最大并发请求数
)
class LLMService:
    def __init__(self):
        from vllm import LLM
        self.model = LLM(
            model="meta-llama/Llama-3-8B-Instruct",
            tensor_parallel_size=1,
        )

    async def __call__(self, request):
        prompt = (await request.json())["prompt"]
        outputs = self.model.generate([prompt])
        return {"text": outputs[0].outputs[0].text}

app = LLMService.bind()
serve.run(app, host="0.0.0.0", port=8000)

Ray Serve 的核心能力:

  • 自动伸缩:根据请求队列长度自动调整副本数
  • 多模型组合:通过 DeploymentGraph 组合多个模型形成推理 Pipeline
  • 流式响应:原生支持 SSE,适配 LLM 的 token-by-token 生成
  • 零停机更新:滚动更新模型版本,不中断服务

4.3 Ray Data:分布式数据处理

面向 ML 工作负载的数据处理库,可以看作"ML 领域的 Spark",但与 GPU 训练/推理流水线无缝衔接。

import ray

# 读取数据
ds = ray.data.read_parquet("s3://bucket/training_data/")

# 分布式预处理
ds = ds.map(tokenize, num_gpus=0)        # CPU 上做 tokenize
ds = ds.filter(lambda x: len(x["tokens"]) <= 4096)  # 过滤超长序列
ds = ds.random_shuffle()                  # 全局随机打乱

# 直接喂给 Ray Train——零拷贝
trainer = TorchTrainer(
    train_func,
    datasets={"train": ds},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)

与 Spark 的区别:Ray Data 原生支持 GPU 算子(比如在 GPU 上做数据增强),且与 Train/Serve 共享同一个集群,不需要跨系统传数据。

4.4 Ray Tune:超参调优

from ray import tune
from ray.tune.schedulers import ASHAScheduler

search_space = {
    "lr": tune.loguniform(1e-5, 1e-3),
    "batch_size": tune.choice([16, 32, 64]),
    "num_layers": tune.randint(2, 8),
}

tuner = tune.Tuner(
    train_func,
    param_space=search_space,
    tune_config=tune.TuneConfig(
        metric="loss",
        mode="min",
        num_samples=50,                    # 尝试 50 组超参
        scheduler=ASHAScheduler(),         # 早停表现差的试验
    ),
    run_config=tune.RunConfig(
        storage_path="s3://bucket/ray_results",
    ),
)

results = tuner.fit()
best = results.get_best_result()
print(f"Best config: {best.config}")

ASHA 调度器的作用:在训练早期就终止表现明显差的超参组合,把 GPU 资源让给有前途的试验。50 组超参实际可能只需要跑完其中 10-15 组的全部 epoch。


五、Ray 在 LLM 时代的定位

Ray 在 LLM 技术栈中不是某个环节的工具,而是贯穿训练、推理、数据处理全链路的底层运行时

LLM 全链路:
                      ↓ Ray Data
训练数据清洗 → 预处理 → 分布式训练 → 模型评估 → 在线推理
                      ↓ Ray Train    ↓ Ray Tune  ↓ Ray Serve

5.1 vLLM + Ray:分布式推理的标准组合

vLLM 使用 Ray Actor 实现 Tensor Parallelism——模型的不同层分布在不同 GPU 上,通过 NCCL 通信。

# vLLM 多卡推理——底层自动创建 Ray Actor
from vllm import LLM

model = LLM(
    model="meta-llama/Llama-3-70B-Instruct",
    tensor_parallel_size=4,    # 4 卡 Tensor Parallel
    # vLLM 内部会创建 4 个 Ray Actor,每个绑定 1 块 GPU
)

当你设置 tensor_parallel_size > 1 时,vLLM 内部会:

  1. 启动 Ray 集群(如果还没启动)
  2. 创建 N 个 GPU Worker Actor
  3. 把模型权重分片加载到各 Actor
  4. 推理时通过 NCCL AllReduce 同步中间结果

5.2 训练数据处理

LLM 预训练数据处理的典型流水线:

import ray

# 处理 TB 级别的文本数据
ds = ray.data.read_json("s3://corpus/raw/")

# 1. 语言检测和过滤
ds = ds.map_batches(detect_language, batch_size=1000)
ds = ds.filter(lambda x: x["lang"] == "zh" and x["score"] > 0.8)

# 2. 去重(MinHash LSH)
ds = ds.map_batches(compute_minhash, batch_size=500)
ds = deduplicate(ds, threshold=0.8)

# 3. 质量过滤
ds = ds.map_batches(quality_score, batch_size=500, num_gpus=0.25)
ds = ds.filter(lambda x: x["quality"] > 0.6)

# 4. Tokenize 并写出
ds = ds.map_batches(tokenize, batch_size=256)
ds.write_parquet("s3://corpus/processed/")

这个流水线处理 TB 级数据时,Ray Data 会自动在集群中分配任务、管理 backpressure、处理 straggler。比自己用 multiprocessing + S3 拼凑可靠得多。


六、生产实践经验

6.1 集群部署

在 Kubernetes 上部署 Ray 集群推荐使用 KubeRay Operator:

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: llm-cluster
spec:
  headGroupSpec:
    rayStartParams:
      dashboard-host: "0.0.0.0"
    template:
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray-ml:2.41.0-gpu
          resources:
            limits:
              cpu: "8"
              memory: "32Gi"
  workerGroupSpecs:
  - replicas: 4
    groupName: gpu-workers
    rayStartParams: {}
    template:
      spec:
        containers:
        - name: ray-worker
          image: rayproject/ray-ml:2.41.0-gpu
          resources:
            limits:
              cpu: "16"
              memory: "64Gi"
              nvidia.com/gpu: "1"

6.2 常见陷阱

对象传递大小——不要在 .remote() 参数里直接传大对象:

# 错误:每次调用都序列化 100MB 数据
large_data = load_data()  # 100MB
futures = [process.remote(large_data, i) for i in range(100)]
# → 序列化 100 次 = 10GB 网络传输

# 正确:先 put 到 Object Store,传引用
data_ref = ray.put(large_data)  # 只序列化 1 次
futures = [process.remote(data_ref, i) for i in range(100)]
# → 100 次传的只是 ObjectRef(几十字节)

Actor 死锁——避免 Actor 之间同步等待:

# 危险:Actor A 等 Actor B,B 又等 A
@ray.remote
class A:
    def __init__(self, b):
        self.b = b
    def method(self):
        return ray.get(self.b.method.remote())  # 同步等待 B

# 安全:使用异步方法
@ray.remote
class A:
    def __init__(self, b):
        self.b = b
    async def method(self):
        return await self.b.method.remote()  # 异步等待,不阻塞

资源声明不准确——GPU 声明不对会导致 OOM 或资源浪费:

# 推理任务实际只需要半块 GPU 显存
@ray.remote(num_gpus=0.5)
def inference(batch):
    ...

# 训练任务需要整块 GPU
@ray.remote(num_gpus=1)
def train(data):
    ...

6.3 监控与调试

Ray Dashboard(默认 8265 端口)提供集群全景:

  • 节点状态、CPU/GPU/内存利用率
  • Actor 列表、状态、日志
  • Job 进度和资源消耗
  • Object Store 使用情况

生产环境建议接入 Prometheus + Grafana:

# Ray 内置 Prometheus metrics 端点
# 启动时开启:ray start --metrics-export-port=8080
#
# 关键指标:
# ray_node_cpu_utilization
# ray_node_gpu_utilization
# ray_node_mem_used
# ray_actors_total
# ray_tasks_running

6.4 性能调优清单

场景 优化策略
大数据传输 ray.put() + ObjectRef,避免重复序列化
高并发推理 Actor 多副本 + max_concurrency
批处理任务 合理设置 batch_size,减少调度开销
GPU 利用率低 用 fractional GPU(num_gpus=0.5)提升 GPU 复用
长尾任务 ray.wait() 替代 ray.get() 实现渐进式处理
内存溢出 配置 object_spilling 到磁盘/S3
# ray.wait 渐进式处理——不必等最慢的任务
remaining = [task.remote(i) for i in range(100)]
results = []

while remaining:
    done, remaining = ray.wait(remaining, num_returns=1)
    results.extend(ray.get(done))
    # 每完成一个就处理一个,不被 straggler 阻塞

七、总结

Ray 的核心价值是把分布式系统的复杂性封装在三个简单原语背后。你写的是普通 Python,Ray 处理序列化、调度、故障恢复、资源管理。

对于 AI 工程师来说,Ray 的掌握优先级:

  1. 必须掌握:Task、Actor、Object 三原语,ray.put()/ray.get()/ray.wait() 的正确使用
  2. 按需掌握:Ray Serve(做推理服务时)、Ray Train(做分布式训练时)、Ray Data(做数据处理时)
  3. 了解即可:底层调度策略、GCS 架构、Object Store 实现细节

在 LLM 时代,Ray 已经是事实上的 AI Infra 底座。掌握它不只是多会一个框架,而是理解了 AI 系统从训练到推理的分布式运行方式。