主题
实现 watch 与 watchEffect
本节对标 Vue 3 源码
@vue/runtime-core中的apiWatch.ts
watch 与 watchEffect 的区别
| 特性 | watchEffect | watch |
|---|---|---|
| 依赖收集 | 自动(执行时收集) | 显式指定 source |
| 回调参数 | 无 | (newValue, oldValue) |
| 立即执行 | 默认立即 | 默认不立即(可配置) |
| 适用场景 | 简单副作用 | 需要新旧值对比 |
本质上,watchEffect 和 watch 都是基于 effect + scheduler 实现的。
实现 watchEffect
watchEffect 是最简单的形式 —— 创建一个 effect,立即执行,依赖变化时重新执行:
ts
// watchEffect 是简化版的 watch,不需要回调函数,直接传入副作用函数
export function watchEffect(source: () => void) {
// cb 传 null 表示没有回调,副作用函数本身就是要执行的逻辑
doWatch(source, null)
}
// doWatch 是 watch 和 watchEffect 的核心实现,统一处理两种 API
function doWatch(
source: (() => any) | Ref | object, // source 支持三种类型:函数、ref、reactive 对象
cb: ((newValue: any, oldValue: any) => void) | null, // cb 为 null 时表示 watchEffect 模式
options: WatchOptions = {}, // 可选配置:immediate、deep、flush
) {
// getter 是 effect 的依赖收集函数,根据 source 类型构造不同的 getter
let getter: () => any
if (isFunction(source)) {
// source 是函数时,直接作为 getter 使用(如 () => state.count)
getter = source
} else if (isRef(source)) {
// source 是 ref 时,getter 访问 .value 以触发依赖收集
getter = () => source.value
} else if (isReactive(source)) {
// source 是 reactive 对象时,通过 traverse 递归访问所有属性来收集深层依赖
getter = () => traverse(source)
} else {
// 不支持的类型,返回空函数作为兜底
getter = () => {}
}
// oldValue 用于存储上一次的值,在 cb 模式下与 newValue 对比
let oldValue: any
// job 是实际执行回调或副作用的任务函数,会被调度器调度执行
const job = () => {
if (cb) {
// watch 模式:执行 effect 获取最新值
const newValue = _effect.run()
// 调用回调,传入新值和旧值
cb(newValue, oldValue)
// 更新 oldValue 为当前值,供下次对比使用
oldValue = newValue
} else {
// watchEffect 模式:直接重新执行副作用函数
_effect.run()
}
}
// 创建响应式 effect,第二个参数是 scheduler(调度器)
const _effect = new ReactiveEffect(getter, () => {
// scheduler:当依赖变化时,不立即执行 getter,而是调度 job
if (options.flush === 'sync') {
// sync 模式:同步立即执行 job,不进入队列
job()
} else {
// 默认模式:将 job 推入微任务调度队列,异步批量执行
queueJob(job)
}
})
// 根据配置决定初始化行为
if (cb) {
if (options.immediate) {
// immediate 为 true 时,立即执行一次回调(此时 oldValue 为 undefined)
job()
} else {
// 默认不立即执行回调,先运行 effect 获取初始值作为 oldValue
oldValue = _effect.run()
}
} else {
// watchEffect 模式:立即执行一次副作用函数以收集依赖
_effect.run()
}
// 返回 stop 函数,调用后停止监听,不再响应依赖变化
return () => {
_effect.stop()
}
}实现 watch
watch 需要处理多种 source 类型:
ts
// watch 函数:对 doWatch 的上层封装,显式接收 source 和 cb
export function watch<T>(
source: (() => T) | Ref<T> | object, // 支持 getter 函数、ref 或 reactive 对象作为监听源
cb: (newValue: T, oldValue: T) => void, // 当依赖变化时触发的回调,接收新旧值
options?: WatchOptions, // 可选配置项
) {
// 将 source、cb、options 统一交给 doWatch 处理
return doWatch(source, cb, options)
}source 的多态处理
ts
// 1. getter 函数
watch(() => obj.count, (newVal, oldVal) => { ... })
// 2. ref
watch(countRef, (newVal, oldVal) => { ... })
// 3. reactive 对象(自动深度监听)
watch(state, (newVal, oldVal) => { ... })对于 reactive 对象,需要 traverse 递归访问所有属性以收集依赖:
ts
// traverse 递归遍历对象所有属性,触发每个属性的 getter 以收集依赖
function traverse(value: unknown, seen = new Set()) {
// 如果不是对象类型或已经访问过(防止循环引用导致无限递归),直接返回
if (!isObject(value) || seen.has(value)) {
return value
}
// 将当前对象加入已访问集合,避免重复遍历
seen.add(value)
// 递归访问每个属性,触发对应的响应式 getter 进行依赖收集
for (const key in value as object) {
traverse((value as any)[key], seen)
}
return value
}seen Set 用于防止循环引用导致的无限递归。
WatchOptions
ts
// WatchOptions 定义了 watch/watchEffect 的可选配置
interface WatchOptions {
immediate?: boolean // 是否立即执行回调(默认 false,仅 watch 有效)
deep?: boolean // 是否深度监听嵌套属性的变化
flush?: 'pre' | 'post' | 'sync' // 调度时机:pre=组件更新前 / post=DOM更新后 / sync=同步立即执行
}flush 选项
pre(默认):在组件更新之前执行post:在组件更新之后执行(DOM 已更新)sync:同步执行,不进入调度队列
ts
// sync 模式:数据变化立即执行
watch(source, cb, { flush: 'sync' })
// post 模式:DOM 更新后执行
watch(source, cb, { flush: 'post' })调度器(Scheduler)入口
watch 的 scheduler 是后续实现 nextTick 和异步调度的基础:
ts
// 任务队列:存放待执行的 job 函数
const queue: (() => void)[] = []
// 标记当前是否正在刷新队列,防止重复触发
let isFlushing = false
// 将 job 加入队列,去重后触发异步刷新
function queueJob(job: () => void) {
// 去重:同一个 job 不会重复入队,避免同一个 watcher 在一次事件循环中重复执行
if (!queue.includes(job)) {
queue.push(job)
// 触发队列刷新(如果尚未触发)
queueFlush()
}
}
// 异步刷新队列:利用 Promise 微任务在当前同步代码执行完毕后批量处理
function queueFlush() {
if (!isFlushing) {
isFlushing = true
// Promise.resolve().then() 将 flushJobs 推入微任务队列
Promise.resolve().then(flushJobs)
}
}
// 依次执行队列中所有 job,执行完毕后重置状态
function flushJobs() {
try {
for (let i = 0; i < queue.length; i++) {
queue[i]() // 逐个执行队列中的任务
}
} finally {
// 无论是否出错,都要重置状态,保证下次能正常调度
isFlushing = false
queue.length = 0 // 清空队列
}
}这里先给出简化版的调度器,第 19 节会实现完整版。
清理副作用(onCleanup)
watch 回调中可以注册清理函数,在下次回调执行前调用:
ts
// 通过 onCleanup 处理异步请求的竞态条件
watch(id, async (newId, oldId, onCleanup) => {
// cancelled 标记当前请求是否已被取消
let cancelled = false
// 注册清理函数:当 id 再次变化时,下次回调执行前会先调用此函数
onCleanup(() => {
cancelled = true // 将旧请求标记为已取消
})
// 发起异步请求获取数据
const data = await fetchData(newId)
if (!cancelled) {
// 只有在请求未被取消时才使用数据,避免旧请求覆盖新请求的结果
}
})实现原理:
ts
// cleanup 存储上一次注册的清理函数
let cleanup: (() => void) | undefined
// onCleanup 供用户在回调中注册清理逻辑,会在下次回调执行前被调用
function onCleanup(fn: () => void) {
cleanup = fn
}
// job 中集成 onCleanup 的调用逻辑
const job = () => {
if (cb) {
// 执行 effect 获取最新值
const newValue = _effect.run()
// 在执行新的回调之前,先调用上一次注册的清理函数
if (cleanup) {
cleanup()
}
// 执行用户回调,并将 onCleanup 作为第三个参数传入,供用户注册新的清理函数
cb(newValue, oldValue, onCleanup)
// 更新旧值
oldValue = newValue
} else {
// watchEffect 模式:直接重新执行
_effect.run()
}
}这个模式对处理竞态条件非常有用 —— 当 id 快速变化时,旧请求的结果不应覆盖新请求。
测试用例
ts
// watch 的测试套件
describe('watch', () => {
// 测试:watch 能够监听 reactive 对象的属性变化
it('should watch reactive object', () => {
const state = reactive({ count: 0 }) // 创建响应式对象
let dummy: number
watch(
() => state.count, // 监听 state.count 的变化
(newVal) => {
dummy = newVal // 回调中获取新值
},
)
state.count = 1 // 触发变化
// watch 默认是异步调度的,需要等待 nextTick(微任务)后回调才会执行
await nextTick()
expect(dummy!).toBe(1) // 断言回调已执行,dummy 被更新为 1
})
// 测试:immediate 选项让回调在 watch 创建时立即执行一次
it('should support immediate option', () => {
const state = reactive({ count: 0 })
const cb = vi.fn() // 使用 vitest 的 mock 函数来追踪调用次数
watch(() => state.count, cb, { immediate: true }) // 传入 immediate: true
// 断言回调在 watch 创建后立即被调用了一次(无需等待 nextTick)
expect(cb).toHaveBeenCalledTimes(1)
})
// 测试:watch 回调应提供新值和旧值
it('should provide old and new value', () => {
const state = reactive({ count: 0 })
let oldVal: number, newVal: number
watch(
() => state.count,
(n, o) => {
newVal = n // 回调第一个参数是新值
oldVal = o // 回调第二个参数是旧值
},
)
state.count = 1 // 将 count 从 0 改为 1
await nextTick()
expect(newVal!).toBe(1) // 新值应为 1
expect(oldVal!).toBe(0) // 旧值应为 0
})
})
// watchEffect 的测试套件
describe('watchEffect', () => {
// 测试:watchEffect 创建后立即执行一次副作用函数
it('should run immediately', () => {
const state = reactive({ count: 0 })
let dummy: number
watchEffect(() => {
dummy = state.count // 副作用函数中访问 state.count,自动收集依赖
})
// watchEffect 会立即执行一次,所以 dummy 应该已经被赋值
expect(dummy!).toBe(0)
})
// 测试:依赖变化后,watchEffect 会重新执行
it('should re-run on dependency change', () => {
const state = reactive({ count: 0 })
let dummy: number
watchEffect(() => {
dummy = state.count // 自动追踪 state.count 的依赖
})
state.count = 1 // 修改依赖值
await nextTick() // 等待异步调度执行
expect(dummy!).toBe(1) // 断言副作用函数重新执行,dummy 被更新
})
// 测试:调用 stop 后,watchEffect 应停止监听
it('should stop watching when stop is called', () => {
const state = reactive({ count: 0 })
let dummy: number
// watchEffect 返回一个 stop 函数
const stop = watchEffect(() => {
dummy = state.count
})
expect(dummy!).toBe(0) // 初始执行后 dummy 为 0
stop() // 调用 stop 停止监听
state.count = 1 // 修改依赖值
await nextTick()
// stop 之后,副作用不再执行,dummy 应该仍然是 0
expect(dummy!).toBe(0)
})
})本节小结
- watch / watchEffect 本质 — 都是
effect+scheduler的上层封装 - source 多态 — 支持 getter 函数、ref、reactive 对象
- traverse — 深度监听通过递归访问所有属性实现
- scheduler — 通过微任务队列异步调度,避免同步阻塞
- onCleanup — 解决异步操作的竞态条件
下一节实现响应式进阶 API。