跳到主要内容

SWAI-事件埋点分析服务

一、服务概览

Event Server 是 SWAI 应用的事件埋点采集与 BI 分析服务,基于 NestJS + PostgreSQL + Redis 构建,负责全端事件数据的实时采集、缓冲写入和多维度分析查询。提供 25 项核心指标的计算能力,支撑运营数据看板和定时报表推送。


二、核心架构

2.1 请求生命周期

2.2 数据模型

event_tracker 表结构(PostgreSQL):

字段类型说明索引
idUUID主键PK
event_idVARCHAR事件类型标识组合索引
device_idVARCHAR设备 ID
user_idBIGINT用户 ID(可空)单列索引
platformINT0=Unknown/1=Android/2=iOS/3=Web/4=小程序单列索引
app_idINT应用 ID
ipVARCHAR客户端 IP单列索引
locationVARCHAR国家代码(ip-api 解析)单列索引
event_timeBIGINT事件时间戳(毫秒)组合索引
propertiesJSONB事件自定义属性GIN 索引
extrasJSONB扩展字段GIN 索引
page_urlVARCHAR页面地址
language / timezone / network_typeVARCHAR环境信息
created_atTIMESTAMP入库时间

关键索引策略:

  • (event_id, event_time) 组合索引:BI 查询的核心索引,覆盖按事件类型 + 时间范围的查询
  • JSONB GIN 索引:支持 @> 操作符的高效 JSON 属性过滤

三、事件采集与缓冲写入

3.1 事件类型体系

event_id含义关键 properties
app_version_active应用安装/激活
user_login_event登录/注册register: 0|1(1=新用户注册)
app_active应用活跃
face_work_track_eventAI 内容生成mediaType(IMAGE/VIDEO), outputNumber, timeCost, taskType
credit_track_event积分变动changeType(0=产生, 1=消耗), amount
user_recharge_track付费充值price(金额)
user_subscription_track订阅状态变更status(ACTIVE/CANCELED)

3.2 缓冲写入策略 ⭐

设计要点
  • 批量写入:600 条/批 或 3 秒定时器,双触发机制,平衡写入频率与实时性
  • IP 解析节流:仅在缓冲量低(< 45 条)时进行 IP 地理位置查询,高压力时跳过,避免外部 API 成为瓶颈
  • 死信重试:写入失败的事件重新入队,超过 10000 条上限后丢弃并报警
  • 异步非阻塞:API 响应不等待入库完成,先返回 200,后台异步持久化

四、BI 分析引擎

4.1 25 项核心指标

4.2 BI API 端点

端点功能查询维度
get_base_overview基础概览快照时间范围
get_payment_overview付费概览快照时间范围
get_registration_trend注册趋势时/日/周/月/季/年
get_generated_content_trend内容生成趋势时/日/周/月/季/年
get_payment_trend付费转化趋势时/日/周/月/季/年
get_user_retention留存分析时间范围
get_core_metrics_trend25 项指标趋势日粒度
get_core_metrics_by_days按指定日期查询逗号分隔时间戳
get_country_distribution国家 Top10 分布指标类型

五、关键技术亮点

5.1 时区处理方案 ⭐

所有 BI 查询统一使用 UTC+8 时区,通过精确的偏移量算术确保时间对齐:

核心公式:
TZ_OFFSET = 28800000 (8小时的毫秒数)

日粒度对齐(TypeScript):
alignedTimestamp = new Date(timestamp + TZ_OFFSET).setUTCHours(0,0,0,0) - TZ_OFFSET

SQL 时间截断:
DATE_TRUNC('day', TO_TIMESTAMP(event_time/1000) AT TIME ZONE 'UTC' AT TIME ZONE '+08:00')

查询结果转换:
timestamp = new Date(db_time_bucket).getTime() - TZ_OFFSET
为什么不用 setHours()?

setHours 使用服务器本地时区,在容器化部署中时区不可控,必须显式使用 UTC 操作 + 固定偏移量。

5.2 并发查询优化

25 项核心指标通过 Promise.all 并发执行 20+ 个独立数据库查询,将串行耗时压缩为最慢单次查询的耗时:

性能提升 20 倍

串行执行:~20 × 200ms = 4000ms → 并发执行:~max(200ms) ≈ 200ms,响应时间从 O(n) 降为 O(1)。

5.3 时间序列零填充

BI 查询结果可能存在某些时间点无数据的情况。通过预生成完整时间序列 + Map 映射实现零填充:

1. generateTimeRange(start, end, granularity) → [t1, t2, t3, ...]  // 完整时间轴
2. DB 结果 → Map<timestamp, value> // 稀疏数据
3. allTimestamps.map(ts => dataMap.get(ts) || 0) // 零填充

确保图表无断点,前端渲染连续。

5.4 新用户队列分析 ⭐

"充值新用户数"和"订阅新用户数"指标需要识别当日注册且当日付费的用户,通过 SQL 子查询实现队列分析:

新用户队列分析
-- 找出当日注册的用户(子查询)
SELECT DISTINCT user_id FROM event_tracker
WHERE event_id = 'user_login_event'
AND properties @> '{"register": 1}'
AND DATE_TRUNC('day', ...) = DATE_TRUNC('day', parent.event_time)

-- 在此队列中筛选当日充值的用户(主查询)
WHERE user_id IN (上述子查询)
AND event_id = 'user_recharge_track'

5.5 多级缓存架构

  • 概览接口 TTL:60 秒
  • BI 指标接口 TTL:60 秒
  • 内存缓存容量上限 5000 条

六、可观测性体系

6.1 五级 SLS 日志

日志类型Logstore记录时机关键字段
ErrorerrorAllExceptionsFilter 自动捕获traceId, error, stack, request
AccessaccessResponseInterceptor 自动记录method, path, status, responseTime
Businessbusiness手动埋点operation, module, result, businessData
Debugdebug手动埋点function, debugData, tags
PerformanceperformancePerformanceTracker 自动采集stages[], totalDuration

6.2 性能追踪

每个请求携带 PerformanceTracker 实例,记录各阶段耗时:

请求开始
├── db_query_users: 45ms
├── db_query_payments: 82ms
├── external_api_ip_lookup: 120ms
├── data_transform_metrics: 15ms
└── calculate_retention: 8ms
请求结束: 总耗时 270ms

分阶段命名规范:db_query_xxx / db_update_xxx / external_api_xxx / data_transform_xxx / calculate_xxx


七、技术栈总览

类别技术选型
框架NestJS 10 + TypeScript 5.3
数据库PostgreSQL + TypeORM 0.3
缓存Redis(L2)+ CacheableMemory(L1)
限流@nestjs/throttler(200 req/s)
日志阿里云 SLS(5 级日志体系)
文档Swagger/OpenAPI(自动生成)
地理位置ip-api.com(批量查询)
部署Docker + Kubernetes


八、技术方案深入剖析(面试细节)

8.1 事件缓冲写入:双触发 + 原子交换的细节

缓冲服务的核心在于写入吞吐量数据一致性之间的权衡:

event-buffer.service.ts
// 双触发机制
private buffer: EventEntity[] = [];
private readonly BATCH_SIZE = 600;
private readonly FLUSH_INTERVAL = 3000; // 3秒

// 触发 1: 数量阈值 — 高并发场景下快速落盘
async add(events: EventEntity[]) {
this.buffer.push(...events);
if (this.buffer.length >= this.BATCH_SIZE) {
await this.flush();
}
}

// 触发 2: 时间阈值 — 低并发场景下保证时效
onModuleInit() {
this.timer = setInterval(() => {
if (this.buffer.length > 0) this.flush();
}, this.FLUSH_INTERVAL);
}

IP 解析的自适应策略

event-buffer.service.ts
// 仅在缓冲区压力低时才进行 IP 地理位置查询
if (this.buffer.length < 45 && !!ip && !ip.startsWith('::ffff')) {
const ipInfo = await this.getIPInfo(ip); // 调用 ip-api.com
event.location = ipInfo.countryCode;
event.timezone = ipInfo.timezone;
}
// 高压力时跳过 → 优先保证写入吞吐量
面试可聊的点
  • 为什么 600 条/批? 这是 PostgreSQL 批量 INSERT 的性能甜点——太小则 IO 开销大,太大则事务锁定时间长
  • 为什么 45 条阈值决定 IP 解析? ip-api.com 每秒限制 45 请求,超过会被限流,因此以此为界
  • 异步非阻塞的代价:API 返回 200 时数据可能还在缓冲区未入库,极端情况下服务崩溃会丢数据

8.2 时区方案:三层对齐的完整链路

时区处理贯穿参数接收 → SQL 查询 → 结果转换三层,任何一层出错都会导致数据错位:

┌─────────────┐     ┌──────────────────┐     ┌────────────────┐
│ Layer 1 │ │ Layer 2 │ │ Layer 3 │
│ 输入对齐 │ → │ SQL 截断 │ → │ 输出转换 │
│ │ │ │ │ │
│ timestamp │ │ DATE_TRUNC('day', │ │ new Date( │
│ + TZ_OFFSET │ │ TO_TIMESTAMP() │ │ time_bucket │
│ setUTCHours │ │ AT TIME ZONE │ │ ).getTime() │
│ - TZ_OFFSET │ │ 'UTC' │ │ - TZ_OFFSET │
│ │ │ AT TIME ZONE │ │ │
│ │ │ '+08:00') │ │ │
└─────────────┘ └──────────────────┘ └────────────────┘

为什么不用 dayjs.tz()setHours()

  • setHours() 使用运行时系统时区,在 Docker 容器中可能是 UTC、也可能是其他值——不可靠
  • dayjs.tz() 引入额外依赖且性能不如裸算术运算
  • 固定偏移量 28800000ms 语义明确、零依赖、零歧义

8.3 新用户队列分析 SQL:相关子查询详解

这是整个系统中最复杂的 SQL 查询——识别当日注册且当日付费的用户:

相关子查询:当日注册且当日充值
SELECT
DATE_TRUNC('day', TO_TIMESTAMP(event.event_time / 1000)
AT TIME ZONE 'UTC' AT TIME ZONE '+08:00') AS time_bucket,
COUNT(DISTINCT event.user_id) AS count
FROM event_tracker event
WHERE event.event_id = 'user_recharge_track'
AND event.event_time BETWEEN $1 AND $2
AND event.user_id IN (
-- 相关子查询:子查询引用了外层的 event.event_time
SELECT DISTINCT e.user_id
FROM event_tracker e
WHERE e.event_id = 'user_login_event'
AND e.properties @> '{"register": 1}'
AND DATE_TRUNC('day', TO_TIMESTAMP(e.event_time / 1000) ...)
= DATE_TRUNC('day', TO_TIMESTAMP(event.event_time / 1000) ...)
-- ↑ 关键:确保注册日期 = 充值日期(同一天)
)
GROUP BY time_bucket
ORDER BY time_bucket;

面试可聊的点

  • 这是相关子查询(Correlated Subquery),子查询引用外层 event.event_time,每行都重新执行
  • 性能影响:对每条充值记录执行一次子查询,O(M × N) 复杂度
  • properties @> '{"register": 1}' 使用 PostgreSQL JSONB 包含操作符,走 GIN 索引

8.4 留存率计算的逐日查询模式

留存率的计算目前采用逐日循环查询模式:

event.service.ts — 留存率逐日查询
// 对每一天分别执行独立查询
for (const timestamp of allTimestamps) {
const dayStart = timestamp;
const dayEnd = timestamp + 86400000;
const nextDayStart = dayEnd;
const nextDayEnd = nextDayStart + 86400000;

// 查询 1: 当日新注册用户
const newUsers = await this.getNewUsersByDay(dayStart, dayEnd);
// 查询 2: 这些用户中次日活跃的
const retainedUsers = await this.getRetainedUsers(newUsers, nextDayStart, nextDayEnd);

result.push({
timestamp,
newUsersCount: newUsers.length,
nextDayRetainedUsers: retainedUsers.length,
nextDayRetentionRate: retainedUsers.length / newUsers.length,
});
}
性能瓶颈

查询 30 天数据需要 60 次 DB 查询(每天 2 次),365 天则需要 730 次。可通过窗口函数优化为 1 次查询,详见第九章改进建议。

8.5 缓存键策略与 Cache-Control

cache.interceptor.ts
// 缓存键生成:URL + appId + Token
const urlKey = url.replace(/\//g, ':');
// /admin/get_base_overview?start=X → "admin:get_base_overview?start=X"
const cacheKey = `nestjs:${urlKey}:${appId}:${token}`;

// HTTP Cache-Control:服务端 TTL 的一半
// TTL = 60s → max-age = 30s
// 理由:让浏览器缓存比服务端更早失效,降低脏数据风险
response.setHeader('Cache-Control', `public, max-age=${Math.floor(ttl / 1000 / 2)}`);

8.6 SLS Logger 的安全序列化

日志序列化需要处理三个异常场景——循环引用、超大对象、序列化失败:

sls-logger.service.ts
private safeStringify(obj: any, maxLength = 10000): string {
try {
const cache = new Set(); // 循环引用检测
const result = JSON.stringify(obj, (key, value) => {
if (typeof value === 'object' && value !== null) {
if (cache.has(value)) return '[Circular]'; // 替代循环引用
cache.add(value);
}
return value;
}, 2);

// 超大对象截断
return result.length > maxLength
? result.substring(0, maxLength) + '...[truncated]'
: result;
} catch (error) {
return `[Stringify Error: ${error}]`; // 兜底
}
}

九、改进建议(面试加分项)

9.1 事件缓冲区的原子交换

现状:flush 时使用 shift() 逐条取出,如果写入中途异常,已 shift 的事件丢失。

改进方案

event-buffer.service.ts — 改进方案
// 改为原子交换:先换出缓冲区,再处理
private async flush() {
const eventsToProcess = this.buffer; // 引用转移
this.buffer = []; // 原子置空,新事件写入新数组

try {
await this.repository.save(eventsToProcess);
} catch (error) {
// 失败时安全回写
this.buffer = [...eventsToProcess, ...this.buffer];
}
}

9.2 留存率查询优化:窗口函数替代逐日查询

现状:30 天留存需要 60 次查询,365 天需要 730 次。

改进方案:用 SQL 窗口函数一次查出:

留存率优化:CTE + LEFT JOIN 替代逐日查询
WITH daily_new_users AS (
SELECT DATE_TRUNC('day', ...) AS reg_day, user_id
FROM event_tracker
WHERE event_id = 'user_login_event'
AND properties @> '{"register": 1}'
AND event_time BETWEEN $1 AND $2
GROUP BY reg_day, user_id
),
daily_active AS (
SELECT DATE_TRUNC('day', ...) AS active_day, user_id
FROM event_tracker
WHERE event_id IN ('user_login_event', 'app_active')
GROUP BY active_day, user_id
)
SELECT
d.reg_day,
COUNT(DISTINCT d.user_id) AS new_users,
COUNT(DISTINCT CASE WHEN a.active_day = d.reg_day + INTERVAL '1 day'
THEN d.user_id END) AS next_day_retained,
COUNT(DISTINCT CASE WHEN a.active_day = d.reg_day + INTERVAL '7 days'
THEN d.user_id END) AS day7_retained
FROM daily_new_users d
LEFT JOIN daily_active a ON d.user_id = a.user_id
GROUP BY d.reg_day;
-- 1 次查询替代 730 次,性能提升两个数量级

9.3 IP 解析熔断器

现状:ip-api.com 请求失败静默返回空对象,无监控无告警。

改进方案

ip-service-breaker.ts
// 引入熔断器模式
class IPServiceBreaker {
private failures = 0;
private lastFailure = 0;
private readonly threshold = 5; // 连续 5 次失败触发熔断
private readonly resetTimeout = 30000; // 30 秒后半开

async getIPInfo(ip: string) {
if (this.isOpen()) return null; // 熔断状态直接跳过
try {
const result = await axios.get(`http://ip-api.com/json/${ip}`, { timeout: 3000 });
this.failures = 0; // 成功则重置
return result.data;
} catch {
this.failures++;
this.lastFailure = Date.now();
return null;
}
}

private isOpen() {
return this.failures >= this.threshold
&& Date.now() - this.lastFailure < this.resetTimeout;
}
}

9.4 其他可讨论的改进方向

方向现状改进建议
物化视图每次 BI 查询实时聚合创建 daily_metrics 物化视图,定时刷新,BI 查询变为简单 SELECT
事件 ID 常量化字符串散落在代码中提取为 EVENT_IDS 常量对象,IDE 自动补全、防止拼写错误
SQL 模板化DATE_TRUNC 表达式重复 20+ 次封装为 TimeZoneService.dateTruncSQL(granularity)
缓存失效策略纯 TTL 过期写入新事件时主动 invalidate 相关缓存 tag
覆盖索引每次查询回表添加 (event_id, event_time, user_id) 覆盖索引减少 IO
Graceful Shutdown服务关闭时 flush 可能未完成onModuleDestroy + await flush() 保证缓冲区排空

十、面试亮点总结

  1. 事件缓冲批量写入:异步非阻塞 + 600 条/批 + 3 秒双触发 + 死信重试机制
  2. 时区一致性方案:三层对齐(输入/SQL/输出),基于 UTC + 固定偏移量,避免容器时区依赖
  3. 并发查询优化:Promise.all 并行 20+ 数据库查询,响应时间从 O(n) 降为 O(1)
  4. 队列分析 SQL:相关子查询实现"当日注册且当日付费"的同队列识别
  5. 时间序列零填充:预生成时间轴 + Map 映射确保图表数据完整性
  6. 多级缓存:L1 内存 + L2 Redis 两级缓存,HTTP Cache-Control = TTL/2 的降级策略
  7. 全链路可观测:traceId 贯穿 + 5 级 SLS 日志 + PerformanceTracker 分阶段性能追踪
  8. IP 解析自适应:根据缓冲区水位动态决定是否进行地理位置解析,避免外部 API 成为瓶颈
  9. 安全序列化:循环引用检测 + 超大对象截断 + 序列化异常兜底
  10. 改进空间:留存查询可用窗口函数优化 730→1 次查询、缓冲区可用原子交换替代逐条 shift、IP 服务可加熔断器