跳到主要内容

数据库连接

问题

Node.js 如何连接数据库?什么是连接池?ORM 和原生查询有什么区别?

答案

Node.js 可以连接各种关系型和非关系型数据库。使用连接池复用连接提高性能,ORM 提供对象映射简化操作。


MySQL 连接

原生驱动 mysql2

import mysql from 'mysql2/promise';

// 创建连接池
const pool = mysql.createPool({
host: 'localhost',
port: 3306,
user: 'root',
password: 'password',
database: 'mydb',
waitForConnections: true,
connectionLimit: 10, // 最大连接数
queueLimit: 0, // 排队无限制
enableKeepAlive: true,
keepAliveInitialDelay: 0
});

// 查询
async function getUsers() {
const [rows] = await pool.query('SELECT * FROM users');
return rows;
}

// 参数化查询(防注入)
async function getUserById(id: number) {
const [rows] = await pool.query(
'SELECT * FROM users WHERE id = ?',
[id]
);
return rows[0];
}

// 事务
async function transfer(fromId: number, toId: number, amount: number) {
const connection = await pool.getConnection();

try {
await connection.beginTransaction();

await connection.query(
'UPDATE accounts SET balance = balance - ? WHERE id = ?',
[amount, fromId]
);

await connection.query(
'UPDATE accounts SET balance = balance + ? WHERE id = ?',
[amount, toId]
);

await connection.commit();
} catch (err) {
await connection.rollback();
throw err;
} finally {
connection.release(); // 释放回连接池
}
}

PostgreSQL 连接

import { Pool, PoolClient } from 'pg';

const pool = new Pool({
host: 'localhost',
port: 5432,
user: 'postgres',
password: 'password',
database: 'mydb',
max: 20, // 最大连接数
idleTimeoutMillis: 30000, // 空闲超时
connectionTimeoutMillis: 2000
});

// 查询
async function getUsers() {
const result = await pool.query('SELECT * FROM users');
return result.rows;
}

// 参数化查询
async function getUserById(id: number) {
const result = await pool.query(
'SELECT * FROM users WHERE id = $1',
[id]
);
return result.rows[0];
}

// 事务
async function withTransaction<T>(
callback: (client: PoolClient) => Promise<T>
): Promise<T> {
const client = await pool.connect();

try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}

MongoDB 连接

import { MongoClient, Db, Collection, ObjectId } from 'mongodb';

const uri = 'mongodb://localhost:27017';
const client = new MongoClient(uri, {
maxPoolSize: 50, // 连接池大小
minPoolSize: 5,
maxIdleTimeMS: 30000
});

let db: Db;

// 连接
async function connect() {
await client.connect();
db = client.db('mydb');
console.log('Connected to MongoDB');
}

// CRUD 操作
interface User {
_id?: ObjectId;
name: string;
email: string;
createdAt: Date;
}

async function createUser(user: Omit<User, '_id'>) {
const collection: Collection<User> = db.collection('users');
const result = await collection.insertOne({
...user,
createdAt: new Date()
});
return result.insertedId;
}

async function findUsers(query: Partial<User> = {}) {
const collection: Collection<User> = db.collection('users');
return collection.find(query).toArray();
}

async function updateUser(id: string, update: Partial<User>) {
const collection: Collection<User> = db.collection('users');
return collection.updateOne(
{ _id: new ObjectId(id) },
{ $set: update }
);
}

// 聚合
async function getUserStats() {
const collection: Collection<User> = db.collection('users');
return collection.aggregate([
{
$group: {
_id: { $month: '$createdAt' },
count: { $sum: 1 }
}
},
{ $sort: { _id: 1 } }
]).toArray();
}

Redis 连接

import { createClient, RedisClientType } from 'redis';

const client: RedisClientType = createClient({
url: 'redis://localhost:6379',
socket: {
reconnectStrategy: (retries) => Math.min(retries * 50, 500)
}
});

client.on('error', (err) => console.error('Redis Error:', err));
client.on('connect', () => console.log('Redis connected'));

await client.connect();

// 基本操作
await client.set('key', 'value');
await client.setEx('key', 3600, 'value'); // 1小时过期
const value = await client.get('key');

// 哈希
await client.hSet('user:1', { name: 'Alice', age: '30' });
const user = await client.hGetAll('user:1');

// 列表
await client.lPush('queue', 'task1');
const task = await client.rPop('queue');

// 集合
await client.sAdd('tags', ['node', 'typescript']);
const tags = await client.sMembers('tags');

// 缓存模式
async function getWithCache<T>(
key: string,
fetcher: () => Promise<T>,
ttl: number = 3600
): Promise<T> {
const cached = await client.get(key);
if (cached) {
return JSON.parse(cached);
}

const data = await fetcher();
await client.setEx(key, ttl, JSON.stringify(data));
return data;
}

ORM 使用

Prisma

// schema.prisma
// generator client {
// provider = "prisma-client-js"
// }
//
// model User {
// id Int @id @default(autoincrement())
// email String @unique
// name String?
// posts Post[]
// createdAt DateTime @default(now())
// }
//
// model Post {
// id Int @id @default(autoincrement())
// title String
// author User @relation(fields: [authorId], references: [id])
// authorId Int
// }

import { PrismaClient } from '@prisma/client';

const prisma = new PrismaClient();

// CRUD
async function createUser(email: string, name?: string) {
return prisma.user.create({
data: { email, name }
});
}

async function getUserWithPosts(id: number) {
return prisma.user.findUnique({
where: { id },
include: { posts: true }
});
}

async function updateUser(id: number, data: { name?: string }) {
return prisma.user.update({
where: { id },
data
});
}

// 事务
async function createUserWithPost(email: string, title: string) {
return prisma.$transaction(async (tx) => {
const user = await tx.user.create({ data: { email } });
const post = await tx.post.create({
data: { title, authorId: user.id }
});
return { user, post };
});
}

Mongoose(MongoDB ODM)

import mongoose, { Schema, Document, Model } from 'mongoose';

interface IUser extends Document {
name: string;
email: string;
createdAt: Date;
}

const userSchema = new Schema<IUser>({
name: { type: String, required: true },
email: { type: String, required: true, unique: true },
createdAt: { type: Date, default: Date.now }
});

// 添加索引
userSchema.index({ email: 1 });

// 方法
userSchema.methods.getProfile = function() {
return { name: this.name, email: this.email };
};

// 静态方法
userSchema.statics.findByEmail = function(email: string) {
return this.findOne({ email });
};

const User: Model<IUser> = mongoose.model('User', userSchema);

// 使用
await mongoose.connect('mongodb://localhost:27017/mydb');

const user = new User({ name: 'Alice', email: 'alice@example.com' });
await user.save();

const found = await User.findOne({ email: 'alice@example.com' });

连接池原理

参数说明建议值
max最大连接数CPU核数 × 2 ~ 4
min最小连接数0 ~ 5
idleTimeout空闲超时30s ~ 5min
connectionTimeout连接超时2s ~ 10s

常见面试问题

Q1: 什么是连接池?为什么需要连接池?

答案

连接池预先创建并维护一定数量的数据库连接,复用这些连接处理请求。

优点

  • 减少开销:避免频繁创建/销毁连接
  • 提高性能:复用已建立的连接
  • 控制资源:限制最大连接数
  • 提高稳定性:连接管理和健康检查
// 无连接池:每次请求新建连接
async function queryWithoutPool() {
const conn = await mysql.createConnection(config);
const result = await conn.query('SELECT 1');
await conn.end(); // 销毁连接
return result;
}

// 有连接池:复用连接
const pool = mysql.createPool(config);

async function queryWithPool() {
const result = await pool.query('SELECT 1');
return result; // 连接自动归还池
}

Q2: ORM 和原生 SQL 各有什么优缺点?

答案

特性ORM原生 SQL
开发效率
类型安全好(Prisma)需手动定义
性能有开销最优
灵活性较低最高
学习曲线需学习 API需学习 SQL
复杂查询可能受限完全支持
// ORM - 简洁但灵活性受限
const users = await prisma.user.findMany({
where: { age: { gte: 18 } },
include: { posts: true }
});

// 原生 SQL - 灵活但繁琐
const [users] = await pool.query(`
SELECT u.*, GROUP_CONCAT(p.title) as posts
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.age >= ?
GROUP BY u.id
`, [18]);

Q3: 如何处理数据库连接错误?

答案

// 重试机制
async function queryWithRetry<T>(
fn: () => Promise<T>,
maxRetries: number = 3
): Promise<T> {
let lastError: Error;

for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (err) {
lastError = err as Error;

// 判断是否可重试
if (!isRetryableError(err)) {
throw err;
}

// 指数退避
await sleep(Math.pow(2, i) * 100);
}
}

throw lastError!;
}

function isRetryableError(err: any): boolean {
const retryableCodes = [
'ECONNRESET',
'ETIMEDOUT',
'ECONNREFUSED',
'ER_LOCK_DEADLOCK'
];
return retryableCodes.includes(err.code);
}

// 健康检查
async function checkDatabaseHealth(): Promise<boolean> {
try {
await pool.query('SELECT 1');
return true;
} catch {
return false;
}
}

Q4: 如何优化数据库查询性能?

答案

方法说明
索引为常用查询字段添加索引
分页使用 LIMIT/OFFSET 或游标分页
投影只查询需要的字段
缓存Redis 缓存热点数据
批量操作减少往返次数
读写分离主从架构
// 游标分页(大数据量更高效)
async function getUsersWithCursor(cursor?: number, limit = 20) {
const where = cursor ? 'WHERE id > ?' : '';
const params = cursor ? [cursor, limit] : [limit];

const [rows] = await pool.query(
`SELECT id, name FROM users ${where} ORDER BY id LIMIT ?`,
params
);

return {
data: rows,
nextCursor: rows.length === limit ? rows[rows.length - 1].id : null
};
}

// 批量插入
async function batchInsert(users: User[]) {
const values = users.map(u => [u.name, u.email]);
await pool.query(
'INSERT INTO users (name, email) VALUES ?',
[values]
);
}

Q5: 什么是数据库事务?ACID 是什么?

答案

事务是一组原子操作,要么全部成功,要么全部失败。

特性含义
A (Atomicity)原子性:全部成功或全部失败
C (Consistency)一致性:数据始终保持有效状态
I (Isolation)隔离性:事务间互不干扰
D (Durability)持久性:提交后数据永久保存
// 转账事务示例
async function transfer(from: number, to: number, amount: number) {
const client = await pool.connect();

try {
await client.query('BEGIN');

// 检查余额
const { rows } = await client.query(
'SELECT balance FROM accounts WHERE id = $1 FOR UPDATE',
[from]
);

if (rows[0].balance < amount) {
throw new Error('余额不足');
}

// 扣款
await client.query(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
[amount, from]
);

// 入账
await client.query(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
[amount, to]
);

await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}

相关链接