Skip to content

实现 watch 与 watchEffect

本节对标 Vue 3 源码 @vue/runtime-core 中的 apiWatch.ts

watch 与 watchEffect 的区别

特性watchEffectwatch
依赖收集自动(执行时收集)显式指定 source
回调参数(newValue, oldValue)
立即执行默认立即默认不立即(可配置)
适用场景简单副作用需要新旧值对比

本质上,watchEffectwatch 都是基于 effect + scheduler 实现的。

实现 watchEffect

watchEffect 是最简单的形式 —— 创建一个 effect,立即执行,依赖变化时重新执行:

ts
// watchEffect 是简化版的 watch,不需要回调函数,直接传入副作用函数
export function watchEffect(source: () => void) {
  // cb 传 null 表示没有回调,副作用函数本身就是要执行的逻辑
  doWatch(source, null)
}

// doWatch 是 watch 和 watchEffect 的核心实现,统一处理两种 API
function doWatch(
  source: (() => any) | Ref | object, // source 支持三种类型:函数、ref、reactive 对象
  cb: ((newValue: any, oldValue: any) => void) | null, // cb 为 null 时表示 watchEffect 模式
  options: WatchOptions = {}, // 可选配置:immediate、deep、flush
) {
  // getter 是 effect 的依赖收集函数,根据 source 类型构造不同的 getter
  let getter: () => any

  if (isFunction(source)) {
    // source 是函数时,直接作为 getter 使用(如 () => state.count)
    getter = source
  } else if (isRef(source)) {
    // source 是 ref 时,getter 访问 .value 以触发依赖收集
    getter = () => source.value
  } else if (isReactive(source)) {
    // source 是 reactive 对象时,通过 traverse 递归访问所有属性来收集深层依赖
    getter = () => traverse(source)
  } else {
    // 不支持的类型,返回空函数作为兜底
    getter = () => {}
  }

  // oldValue 用于存储上一次的值,在 cb 模式下与 newValue 对比
  let oldValue: any

  // job 是实际执行回调或副作用的任务函数,会被调度器调度执行
  const job = () => {
    if (cb) {
      // watch 模式:执行 effect 获取最新值
      const newValue = _effect.run()
      // 调用回调,传入新值和旧值
      cb(newValue, oldValue)
      // 更新 oldValue 为当前值,供下次对比使用
      oldValue = newValue
    } else {
      // watchEffect 模式:直接重新执行副作用函数
      _effect.run()
    }
  }

  // 创建响应式 effect,第二个参数是 scheduler(调度器)
  const _effect = new ReactiveEffect(getter, () => {
    // scheduler:当依赖变化时,不立即执行 getter,而是调度 job
    if (options.flush === 'sync') {
      // sync 模式:同步立即执行 job,不进入队列
      job()
    } else {
      // 默认模式:将 job 推入微任务调度队列,异步批量执行
      queueJob(job)
    }
  })

  // 根据配置决定初始化行为
  if (cb) {
    if (options.immediate) {
      // immediate 为 true 时,立即执行一次回调(此时 oldValue 为 undefined)
      job()
    } else {
      // 默认不立即执行回调,先运行 effect 获取初始值作为 oldValue
      oldValue = _effect.run()
    }
  } else {
    // watchEffect 模式:立即执行一次副作用函数以收集依赖
    _effect.run()
  }

  // 返回 stop 函数,调用后停止监听,不再响应依赖变化
  return () => {
    _effect.stop()
  }
}

实现 watch

watch 需要处理多种 source 类型:

ts
// watch 函数:对 doWatch 的上层封装,显式接收 source 和 cb
export function watch<T>(
  source: (() => T) | Ref<T> | object, // 支持 getter 函数、ref 或 reactive 对象作为监听源
  cb: (newValue: T, oldValue: T) => void, // 当依赖变化时触发的回调,接收新旧值
  options?: WatchOptions, // 可选配置项
) {
  // 将 source、cb、options 统一交给 doWatch 处理
  return doWatch(source, cb, options)
}

source 的多态处理

ts
// 1. getter 函数
watch(() => obj.count, (newVal, oldVal) => { ... })

// 2. ref
watch(countRef, (newVal, oldVal) => { ... })

// 3. reactive 对象(自动深度监听)
watch(state, (newVal, oldVal) => { ... })

对于 reactive 对象,需要 traverse 递归访问所有属性以收集依赖:

ts
// traverse 递归遍历对象所有属性,触发每个属性的 getter 以收集依赖
function traverse(value: unknown, seen = new Set()) {
  // 如果不是对象类型或已经访问过(防止循环引用导致无限递归),直接返回
  if (!isObject(value) || seen.has(value)) {
    return value
  }

  // 将当前对象加入已访问集合,避免重复遍历
  seen.add(value)

  // 递归访问每个属性,触发对应的响应式 getter 进行依赖收集
  for (const key in value as object) {
    traverse((value as any)[key], seen)
  }

  return value
}

seen Set 用于防止循环引用导致的无限递归。

WatchOptions

ts
// WatchOptions 定义了 watch/watchEffect 的可选配置
interface WatchOptions {
  immediate?: boolean   // 是否立即执行回调(默认 false,仅 watch 有效)
  deep?: boolean        // 是否深度监听嵌套属性的变化
  flush?: 'pre' | 'post' | 'sync'  // 调度时机:pre=组件更新前 / post=DOM更新后 / sync=同步立即执行
}

flush 选项

  • pre(默认):在组件更新之前执行
  • post:在组件更新之后执行(DOM 已更新)
  • sync:同步执行,不进入调度队列
ts
// sync 模式:数据变化立即执行
watch(source, cb, { flush: 'sync' })

// post 模式:DOM 更新后执行
watch(source, cb, { flush: 'post' })

调度器(Scheduler)入口

watch 的 scheduler 是后续实现 nextTick 和异步调度的基础:

ts
// 任务队列:存放待执行的 job 函数
const queue: (() => void)[] = []
// 标记当前是否正在刷新队列,防止重复触发
let isFlushing = false

// 将 job 加入队列,去重后触发异步刷新
function queueJob(job: () => void) {
  // 去重:同一个 job 不会重复入队,避免同一个 watcher 在一次事件循环中重复执行
  if (!queue.includes(job)) {
    queue.push(job)
    // 触发队列刷新(如果尚未触发)
    queueFlush()
  }
}

// 异步刷新队列:利用 Promise 微任务在当前同步代码执行完毕后批量处理
function queueFlush() {
  if (!isFlushing) {
    isFlushing = true
    // Promise.resolve().then() 将 flushJobs 推入微任务队列
    Promise.resolve().then(flushJobs)
  }
}

// 依次执行队列中所有 job,执行完毕后重置状态
function flushJobs() {
  try {
    for (let i = 0; i < queue.length; i++) {
      queue[i]() // 逐个执行队列中的任务
    }
  } finally {
    // 无论是否出错,都要重置状态,保证下次能正常调度
    isFlushing = false
    queue.length = 0 // 清空队列
  }
}

这里先给出简化版的调度器,第 19 节会实现完整版。

清理副作用(onCleanup)

watch 回调中可以注册清理函数,在下次回调执行前调用:

ts
// 通过 onCleanup 处理异步请求的竞态条件
watch(id, async (newId, oldId, onCleanup) => {
  // cancelled 标记当前请求是否已被取消
  let cancelled = false
  // 注册清理函数:当 id 再次变化时,下次回调执行前会先调用此函数
  onCleanup(() => {
    cancelled = true // 将旧请求标记为已取消
  })

  // 发起异步请求获取数据
  const data = await fetchData(newId)
  if (!cancelled) {
    // 只有在请求未被取消时才使用数据,避免旧请求覆盖新请求的结果
  }
})

实现原理:

ts
// cleanup 存储上一次注册的清理函数
let cleanup: (() => void) | undefined

// onCleanup 供用户在回调中注册清理逻辑,会在下次回调执行前被调用
function onCleanup(fn: () => void) {
  cleanup = fn
}

// job 中集成 onCleanup 的调用逻辑
const job = () => {
  if (cb) {
    // 执行 effect 获取最新值
    const newValue = _effect.run()
    // 在执行新的回调之前,先调用上一次注册的清理函数
    if (cleanup) {
      cleanup()
    }
    // 执行用户回调,并将 onCleanup 作为第三个参数传入,供用户注册新的清理函数
    cb(newValue, oldValue, onCleanup)
    // 更新旧值
    oldValue = newValue
  } else {
    // watchEffect 模式:直接重新执行
    _effect.run()
  }
}

这个模式对处理竞态条件非常有用 —— 当 id 快速变化时,旧请求的结果不应覆盖新请求。

测试用例

ts
// watch 的测试套件
describe('watch', () => {
  // 测试:watch 能够监听 reactive 对象的属性变化
  it('should watch reactive object', () => {
    const state = reactive({ count: 0 }) // 创建响应式对象
    let dummy: number
    watch(
      () => state.count, // 监听 state.count 的变化
      (newVal) => {
        dummy = newVal // 回调中获取新值
      },
    )
    state.count = 1 // 触发变化
    // watch 默认是异步调度的,需要等待 nextTick(微任务)后回调才会执行
    await nextTick()
    expect(dummy!).toBe(1) // 断言回调已执行,dummy 被更新为 1
  })

  // 测试:immediate 选项让回调在 watch 创建时立即执行一次
  it('should support immediate option', () => {
    const state = reactive({ count: 0 })
    const cb = vi.fn() // 使用 vitest 的 mock 函数来追踪调用次数
    watch(() => state.count, cb, { immediate: true }) // 传入 immediate: true
    // 断言回调在 watch 创建后立即被调用了一次(无需等待 nextTick)
    expect(cb).toHaveBeenCalledTimes(1)
  })

  // 测试:watch 回调应提供新值和旧值
  it('should provide old and new value', () => {
    const state = reactive({ count: 0 })
    let oldVal: number, newVal: number
    watch(
      () => state.count,
      (n, o) => {
        newVal = n // 回调第一个参数是新值
        oldVal = o // 回调第二个参数是旧值
      },
    )
    state.count = 1 // 将 count 从 0 改为 1
    await nextTick()
    expect(newVal!).toBe(1) // 新值应为 1
    expect(oldVal!).toBe(0) // 旧值应为 0
  })
})

// watchEffect 的测试套件
describe('watchEffect', () => {
  // 测试:watchEffect 创建后立即执行一次副作用函数
  it('should run immediately', () => {
    const state = reactive({ count: 0 })
    let dummy: number
    watchEffect(() => {
      dummy = state.count // 副作用函数中访问 state.count,自动收集依赖
    })
    // watchEffect 会立即执行一次,所以 dummy 应该已经被赋值
    expect(dummy!).toBe(0)
  })

  // 测试:依赖变化后,watchEffect 会重新执行
  it('should re-run on dependency change', () => {
    const state = reactive({ count: 0 })
    let dummy: number
    watchEffect(() => {
      dummy = state.count // 自动追踪 state.count 的依赖
    })
    state.count = 1 // 修改依赖值
    await nextTick() // 等待异步调度执行
    expect(dummy!).toBe(1) // 断言副作用函数重新执行,dummy 被更新
  })

  // 测试:调用 stop 后,watchEffect 应停止监听
  it('should stop watching when stop is called', () => {
    const state = reactive({ count: 0 })
    let dummy: number
    // watchEffect 返回一个 stop 函数
    const stop = watchEffect(() => {
      dummy = state.count
    })
    expect(dummy!).toBe(0) // 初始执行后 dummy 为 0

    stop() // 调用 stop 停止监听
    state.count = 1 // 修改依赖值
    await nextTick()
    // stop 之后,副作用不再执行,dummy 应该仍然是 0
    expect(dummy!).toBe(0)
  })
})

本节小结

  1. watch / watchEffect 本质 — 都是 effect + scheduler 的上层封装
  2. source 多态 — 支持 getter 函数、ref、reactive 对象
  3. traverse — 深度监听通过递归访问所有属性实现
  4. scheduler — 通过微任务队列异步调度,避免同步阻塞
  5. onCleanup — 解决异步操作的竞态条件

下一节实现响应式进阶 API。

用心学习,用代码说话 💻