SWAI-事件埋点分析服务
一、服务概览
Event Server 是 SWAI 应用的事件埋点采集与 BI 分析服务,基于 NestJS + PostgreSQL + Redis 构建,负责全端事件数据的实时采集、缓冲写入和多维度分析查询。提供 25 项核心指标的计算能力,支撑运营数据看板和定时报表推送。
二、核心架构
2.1 请求生命周期
2.2 数据模型
event_tracker 表结构(PostgreSQL):
| 字段 | 类型 | 说明 | 索引 |
|---|---|---|---|
| id | UUID | 主键 | PK |
| event_id | VARCHAR | 事件类型标识 | 组合索引 |
| device_id | VARCHAR | 设备 ID | |
| user_id | BIGINT | 用户 ID(可空) | 单列索引 |
| platform | INT | 0=Unknown/1=Android/2=iOS/3=Web/4=小程序 | 单列索引 |
| app_id | INT | 应用 ID | |
| ip | VARCHAR | 客户端 IP | 单列索引 |
| location | VARCHAR | 国家代码(ip-api 解析) | 单列索引 |
| event_time | BIGINT | 事件时间戳(毫秒) | 组合索引 |
| properties | JSONB | 事件自定义属性 | GIN 索引 |
| extras | JSONB | 扩展字段 | GIN 索引 |
| page_url | VARCHAR | 页面地址 | |
| language / timezone / network_type | VARCHAR | 环境信息 | |
| created_at | TIMESTAMP | 入库时间 |
关键索引策略:
(event_id, event_time)组合索引:BI 查询的核心索引,覆盖按事件类型 + 时间范围的查询JSONB GIN 索引:支持@>操作符的高效 JSON 属性过滤
三、事件采集与缓冲写入
3.1 事件类型体系
| event_id | 含义 | 关键 properties |
|---|---|---|
app_version_active | 应用安装/激活 | — |
user_login_event | 登录/注册 | register: 0|1(1=新用户注册) |
app_active | 应用活跃 | — |
face_work_track_event | AI 内容生成 | mediaType(IMAGE/VIDEO), outputNumber, timeCost, taskType |
credit_track_event | 积分变动 | changeType(0=产生, 1=消耗), amount |
user_recharge_track | 付费充值 | price(金额) |
user_subscription_track | 订阅状态变更 | status(ACTIVE/CANCELED) |
3.2 缓冲写入策略 ⭐
- 批量写入:600 条/批 或 3 秒定时器,双触发机制,平衡写入频率与实时性
- IP 解析节流:仅在缓冲量低(< 45 条)时进行 IP 地理位置查询,高压力时跳过,避免外部 API 成为瓶颈
- 死信重试:写入失败的事件重新入队,超过 10000 条上限后丢弃并报警
- 异步非阻塞:API 响应不等待入库完成,先返回 200,后台异步持久化
四、BI 分析引擎
4.1 25 项核心指标
4.2 BI API 端点
| 端点 | 功能 | 查询维度 |
|---|---|---|
get_base_overview | 基础概览快照 | 时间范围 |
get_payment_overview | 付费概览快照 | 时间范围 |
get_registration_trend | 注册趋势 | 时/日/周/月/季/年 |
get_generated_content_trend | 内容生成趋势 | 时/日/周/月/季/年 |
get_payment_trend | 付费转化趋势 | 时/日/周/月/季/年 |
get_user_retention | 留存分析 | 时间范围 |
get_core_metrics_trend | 25 项指标趋势 | 日粒度 |
get_core_metrics_by_days | 按指定日期查询 | 逗号分隔时间戳 |
get_country_distribution | 国家 Top10 分布 | 指标类型 |
五、关键技术亮点
5.1 时区处理方案 ⭐
所有 BI 查询统一使用 UTC+8 时区,通过精确的偏移量算术确保时间对齐:
核心公式:
TZ_OFFSET = 28800000 (8小时的毫秒数)
日粒度对齐(TypeScript):
alignedTimestamp = new Date(timestamp + TZ_OFFSET).setUTCHours(0,0,0,0) - TZ_OFFSET
SQL 时间截断:
DATE_TRUNC('day', TO_TIMESTAMP(event_time/1000) AT TIME ZONE 'UTC' AT TIME ZONE '+08:00')
查询结果转换:
timestamp = new Date(db_time_bucket).getTime() - TZ_OFFSET
setHours 使用服务器本地时区,在容器化部署中时区不可控,必须显式使用 UTC 操作 + 固定偏移量。
5.2 并发查询优化
25 项核心指标通过 Promise.all 并发执行 20+ 个独立数据库查询,将串行耗时压缩为最慢单次查询的耗时:
串行执行:~20 × 200ms = 4000ms → 并发执行:~max(200ms) ≈ 200ms,响应时间从 O(n) 降为 O(1)。
5.3 时间序列零填充
BI 查询结果可能存在某些时间点无数据的情况。通过预生成完整时间序列 + Map 映射实现零填充:
1. generateTimeRange(start, end, granularity) → [t1, t2, t3, ...] // 完整时间轴
2. DB 结果 → Map<timestamp, value> // 稀疏数据
3. allTimestamps.map(ts => dataMap.get(ts) || 0) // 零填充
确保图表无断点,前端渲染连续。
5.4 新用户队列分析 ⭐
"充值新用户数"和"订阅新用户数"指标需要识别当日注册且当日付费的用户,通过 SQL 子查询实现队列分析:
-- 找出当日注册的用户(子查询)
SELECT DISTINCT user_id FROM event_tracker
WHERE event_id = 'user_login_event'
AND properties @> '{"register": 1}'
AND DATE_TRUNC('day', ...) = DATE_TRUNC('day', parent.event_time)
-- 在此队列中筛选当日充值的用户(主查询)
WHERE user_id IN (上述子查询)
AND event_id = 'user_recharge_track'
5.5 多级缓存架构
- 概览接口 TTL:60 秒
- BI 指标接口 TTL:60 秒
- 内存缓存容量上限 5000 条
六、可观测性体系
6.1 五级 SLS 日志
| 日志类型 | Logstore | 记录时机 | 关键字段 |
|---|---|---|---|
| Error | error | AllExceptionsFilter 自动捕获 | traceId, error, stack, request |
| Access | access | ResponseInterceptor 自动记录 | method, path, status, responseTime |
| Business | business | 手动埋点 | operation, module, result, businessData |
| Debug | debug | 手动埋点 | function, debugData, tags |
| Performance | performance | PerformanceTracker 自动采集 | stages[], totalDuration |
6.2 性能追踪
每个请求携带 PerformanceTracker 实例,记录各阶段耗时:
请求开始
├── db_query_users: 45ms
├── db_query_payments: 82ms
├── external_api_ip_lookup: 120ms
├── data_transform_metrics: 15ms
└── calculate_retention: 8ms
请求结束: 总耗时 270ms
分阶段命名规范:db_query_xxx / db_update_xxx / external_api_xxx / data_transform_xxx / calculate_xxx
七、技术栈总览
| 类别 | 技术选型 |
|---|---|
| 框架 | NestJS 10 + TypeScript 5.3 |
| 数据库 | PostgreSQL + TypeORM 0.3 |
| 缓存 | Redis(L2)+ CacheableMemory(L1) |
| 限流 | @nestjs/throttler(200 req/s) |
| 日志 | 阿里云 SLS(5 级日志体系) |
| 文档 | Swagger/OpenAPI(自动生成) |
| 地理位置 | ip-api.com(批量查询) |
| 部署 | Docker + Kubernetes |
八、技术方案深入剖析(面试细节)
8.1 事件缓冲写入:双触发 + 原子交换的细节
缓冲服务的核心在于写入吞吐量和数据一致性之间的权衡:
// 双触发机制
private buffer: EventEntity[] = [];
private readonly BATCH_SIZE = 600;
private readonly FLUSH_INTERVAL = 3000; // 3秒
// 触发 1: 数量阈值 — 高并发场景下快速落盘
async add(events: EventEntity[]) {
this.buffer.push(...events);
if (this.buffer.length >= this.BATCH_SIZE) {
await this.flush();
}
}
// 触发 2: 时间阈值 — 低并发场景下保证时效
onModuleInit() {
this.timer = setInterval(() => {
if (this.buffer.length > 0) this.flush();
}, this.FLUSH_INTERVAL);
}
IP 解析的自适应策略:
// 仅在缓冲区压力低时才进行 IP 地理位置查询
if (this.buffer.length < 45 && !!ip && !ip.startsWith('::ffff')) {
const ipInfo = await this.getIPInfo(ip); // 调用 ip-api.com
event.location = ipInfo.countryCode;
event.timezone = ipInfo.timezone;
}
// 高压力时跳过 → 优先保证写入吞吐量
- 为什么 600 条/批? 这是 PostgreSQL 批量 INSERT 的性能甜点——太小则 IO 开销大,太大则事务锁定时间长
- 为什么 45 条阈值决定 IP 解析? ip-api.com 每秒限制 45 请求,超过会被限流,因此以此为界
- 异步非阻塞的代价:API 返回 200 时数据可能还在缓冲区未入库,极端情况下服务崩溃会丢数据
8.2 时区方案:三层对齐的完整链路
时区处理贯穿参数接收 → SQL 查询 → 结果转换三层,任何一层出错都会导致数据错位:
┌─────────────┐ ┌──────────────────┐ ┌────────────────┐
│ Layer 1 │ │ Layer 2 │ │ Layer 3 │
│ 输入对齐 │ → │ SQL 截断 │ → │ 输出转换 │
│ │ │ │ │ │
│ timestamp │ │ DATE_TRUNC('day', │ │ new Date( │
│ + TZ_OFFSET │ │ TO_TIMESTAMP() │ │ time_bucket │
│ setUTCHours │ │ AT TIME ZONE │ │ ).getTime() │
│ - TZ_OFFSET │ │ 'UTC' │ │ - TZ_OFFSET │
│ │ │ AT TIME ZONE │ │ │
│ │ │ '+08:00') │ │ │
└─────────────┘ └──────────────────┘ └────────────────┘
为什么不用 dayjs.tz() 或 setHours()?
setHours()使用运行时系统时区,在 Docker 容器中可能是 UTC、也可能是其他值——不可靠dayjs.tz()引入额外依赖且性能不如裸算术运算- 固定偏移量
28800000ms语义明确、零依赖、零歧义
8.3 新用户队列分析 SQL:相关子查询详解
这是整个系统中最复杂的 SQL 查询——识别当日注册且当日付费的用户:
SELECT
DATE_TRUNC('day', TO_TIMESTAMP(event.event_time / 1000)
AT TIME ZONE 'UTC' AT TIME ZONE '+08:00') AS time_bucket,
COUNT(DISTINCT event.user_id) AS count
FROM event_tracker event
WHERE event.event_id = 'user_recharge_track'
AND event.event_time BETWEEN $1 AND $2
AND event.user_id IN (
-- 相关子查询:子查询引用了外层的 event.event_time
SELECT DISTINCT e.user_id
FROM event_tracker e
WHERE e.event_id = 'user_login_event'
AND e.properties @> '{"register": 1}'
AND DATE_TRUNC('day', TO_TIMESTAMP(e.event_time / 1000) ...)
= DATE_TRUNC('day', TO_TIMESTAMP(event.event_time / 1000) ...)
-- ↑ 关键:确保注册日期 = 充值日期(同一天)
)
GROUP BY time_bucket
ORDER BY time_bucket;
面试可聊的点:
- 这是相关子查询(Correlated Subquery),子查询引用外层
event.event_time,每行都重新执行 - 性能影响:对每条充值记录执行一次子查询,O(M × N) 复杂度
properties @> '{"register": 1}'使用 PostgreSQL JSONB 包含操作符,走 GIN 索引
8.4 留存率计算的逐日查询模式
留存率的计算目前采用逐日循环查询模式:
// 对每一天分别执行独立查询
for (const timestamp of allTimestamps) {
const dayStart = timestamp;
const dayEnd = timestamp + 86400000;
const nextDayStart = dayEnd;
const nextDayEnd = nextDayStart + 86400000;
// 查询 1: 当日新注册用户
const newUsers = await this.getNewUsersByDay(dayStart, dayEnd);
// 查询 2: 这些用户中次日活跃的
const retainedUsers = await this.getRetainedUsers(newUsers, nextDayStart, nextDayEnd);
result.push({
timestamp,
newUsersCount: newUsers.length,
nextDayRetainedUsers: retainedUsers.length,
nextDayRetentionRate: retainedUsers.length / newUsers.length,
});
}
查询 30 天数据需要 60 次 DB 查询(每天 2 次),365 天则需要 730 次。可通过窗口函数优化为 1 次查询,详见第九章改进建议。
8.5 缓存键策略与 Cache-Control
// 缓存键生成:URL + appId + Token
const urlKey = url.replace(/\//g, ':');
// /admin/get_base_overview?start=X → "admin:get_base_overview?start=X"
const cacheKey = `nestjs:${urlKey}:${appId}:${token}`;
// HTTP Cache-Control:服务端 TTL 的一半
// TTL = 60s → max-age = 30s
// 理由:让浏览器缓存比服务端更早失效,降低脏数据风险
response.setHeader('Cache-Control', `public, max-age=${Math.floor(ttl / 1000 / 2)}`);
8.6 SLS Logger 的安全序列化
日志序列化需要处理三个异常场景——循环引用、超大对象、序列化失败:
private safeStringify(obj: any, maxLength = 10000): string {
try {
const cache = new Set(); // 循环引用检测
const result = JSON.stringify(obj, (key, value) => {
if (typeof value === 'object' && value !== null) {
if (cache.has(value)) return '[Circular]'; // 替代循环引用
cache.add(value);
}
return value;
}, 2);
// 超大对象截断
return result.length > maxLength
? result.substring(0, maxLength) + '...[truncated]'
: result;
} catch (error) {
return `[Stringify Error: ${error}]`; // 兜底
}
}
九、改进建议(面试加分项)
9.1 事件缓冲区的原子交换
现状:flush 时使用 shift() 逐条取出,如果写入中途异常,已 shift 的事件丢失。
改进方案:
// 改为原子交换:先换出缓冲区,再处理
private async flush() {
const eventsToProcess = this.buffer; // 引用转移
this.buffer = []; // 原子置空,新事件写入新数组
try {
await this.repository.save(eventsToProcess);
} catch (error) {
// 失败时安全回写
this.buffer = [...eventsToProcess, ...this.buffer];
}
}
9.2 留存率查询优化:窗口函数替代逐日查询
现状:30 天留存需要 60 次查询,365 天需要 730 次。
改进方案:用 SQL 窗口函数一次查出:
WITH daily_new_users AS (
SELECT DATE_TRUNC('day', ...) AS reg_day, user_id
FROM event_tracker
WHERE event_id = 'user_login_event'
AND properties @> '{"register": 1}'
AND event_time BETWEEN $1 AND $2
GROUP BY reg_day, user_id
),
daily_active AS (
SELECT DATE_TRUNC('day', ...) AS active_day, user_id
FROM event_tracker
WHERE event_id IN ('user_login_event', 'app_active')
GROUP BY active_day, user_id
)
SELECT
d.reg_day,
COUNT(DISTINCT d.user_id) AS new_users,
COUNT(DISTINCT CASE WHEN a.active_day = d.reg_day + INTERVAL '1 day'
THEN d.user_id END) AS next_day_retained,
COUNT(DISTINCT CASE WHEN a.active_day = d.reg_day + INTERVAL '7 days'
THEN d.user_id END) AS day7_retained
FROM daily_new_users d
LEFT JOIN daily_active a ON d.user_id = a.user_id
GROUP BY d.reg_day;
-- 1 次查询替代 730 次,性能提升两个数量级
9.3 IP 解析熔断器
现状:ip-api.com 请求失败静默返回空对象,无监控无告警。
改进方案:
// 引入熔断器模式
class IPServiceBreaker {
private failures = 0;
private lastFailure = 0;
private readonly threshold = 5; // 连续 5 次失败触发熔断
private readonly resetTimeout = 30000; // 30 秒后半开
async getIPInfo(ip: string) {
if (this.isOpen()) return null; // 熔断状态直接跳过
try {
const result = await axios.get(`http://ip-api.com/json/${ip}`, { timeout: 3000 });
this.failures = 0; // 成功则重置
return result.data;
} catch {
this.failures++;
this.lastFailure = Date.now();
return null;
}
}
private isOpen() {
return this.failures >= this.threshold
&& Date.now() - this.lastFailure < this.resetTimeout;
}
}
9.4 其他可讨论的改进方向
| 方向 | 现状 | 改进建议 |
|---|---|---|
| 物化视图 | 每次 BI 查询实时聚合 | 创建 daily_metrics 物化视图,定时刷新,BI 查询变为简单 SELECT |
| 事件 ID 常量化 | 字符串散落在代码中 | 提取为 EVENT_IDS 常量对象,IDE 自动补全、防止拼写错误 |
| SQL 模板化 | DATE_TRUNC 表达式重复 20+ 次 | 封装为 TimeZoneService.dateTruncSQL(granularity) |
| 缓存失效策略 | 纯 TTL 过期 | 写入新事件时主动 invalidate 相关缓存 tag |
| 覆盖索引 | 每次查询回表 | 添加 (event_id, event_time, user_id) 覆盖索引减少 IO |
| Graceful Shutdown | 服务关闭时 flush 可能未完成 | 用 onModuleDestroy + await flush() 保证缓冲区排空 |
十、面试亮点总结
- 事件缓冲批量写入:异步非阻塞 + 600 条/批 + 3 秒双触发 + 死信重试机制
- 时区一致性方案:三层对齐(输入/SQL/输出),基于 UTC + 固定偏移量,避免容器时区依赖
- 并发查询优化:Promise.all 并行 20+ 数据库查询,响应时间从 O(n) 降为 O(1)
- 队列分析 SQL:相关子查询实现"当日注册且当日付费"的同队列识别
- 时间序列零填充:预生成时间轴 + Map 映射确保图表数据完整性
- 多级缓存:L1 内存 + L2 Redis 两级缓存,HTTP Cache-Control = TTL/2 的降级策略
- 全链路可观测:traceId 贯穿 + 5 级 SLS 日志 + PerformanceTracker 分阶段性能追踪
- IP 解析自适应:根据缓冲区水位动态决定是否进行地理位置解析,避免外部 API 成为瓶颈
- 安全序列化:循环引用检测 + 超大对象截断 + 序列化异常兜底
- 改进空间:留存查询可用窗口函数优化 730→1 次查询、缓冲区可用原子交换替代逐条 shift、IP 服务可加熔断器