MySQL分库分表(无中间件)
一、 分库分表方案设计
1. 垂直拆分 (按业务模块)
-- 原始单体数据库
CREATE DATABASE ecommerce;
USE ecommerce;
-- 垂直拆分后的数据库
-- 用户库
CREATE DATABASE user_center;
USE user_center;
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
username VARCHAR(50) UNIQUE,
email VARCHAR(100) UNIQUE,
password VARCHAR(100),
mobile VARCHAR(20),
status TINYINT DEFAULT 1,
created_at DATETIME,
updated_at DATETIME
);
CREATE TABLE user_profiles (
user_id BIGINT PRIMARY KEY,
real_name VARCHAR(50),
avatar VARCHAR(200),
gender TINYINT,
birthday DATE,
bio TEXT
);
CREATE TABLE user_address (
address_id BIGINT PRIMARY KEY,
user_id BIGINT,
province VARCHAR(50),
city VARCHAR(50),
district VARCHAR(50),
detail VARCHAR(200),
is_default TINYINT DEFAULT 0,
INDEX idx_user_id (user_id)
);
-- 订单库
CREATE DATABASE order_center;
USE order_center;
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
total_amount DECIMAL(10,2),
status TINYINT,
payment_status TINYINT,
created_at DATETIME,
paid_at DATETIME
);
CREATE TABLE order_items (
item_id BIGINT PRIMARY KEY,
order_id BIGINT,
product_id BIGINT,
product_name VARCHAR(100),
price DECIMAL(10,2),
quantity INT,
INDEX idx_order_id (order_id)
);
-- 商品库
CREATE DATABASE product_center;
USE product_center;
CREATE TABLE products (
product_id BIGINT PRIMARY KEY,
name VARCHAR(100),
category_id INT,
price DECIMAL(10,2),
stock INT,
status TINYINT,
created_at DATETIME
);
CREATE TABLE categories (
category_id INT PRIMARY KEY,
name VARCHAR(50),
parent_id INT
);
2. 水平拆分 (按数据量)
用户表水平分表(10个分表)
-- 在 user_center 数据库中创建分表
USE user_center;
-- 下面语句中的//为分隔符
DELIMITER //
CREATE PROCEDURE CreateUserShards()
BEGIN
DECLARE i INT DEFAULT 0;
WHILE i < 10 DO
SET @sql = CONCAT('
CREATE TABLE users_', i, ' (
user_id BIGINT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100),
password VARCHAR(100),
mobile VARCHAR(20),
status TINYINT DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_username_', i, ' (username),
UNIQUE KEY uk_email_', i, ' (email),
INDEX idx_mobile_', i, ' (mobile),
INDEX idx_created_', i, ' (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET i = i + 1;
END WHILE;
END//
DELIMITER ;
CALL CreateUserShards();
订单表水平分表(按用户ID分表)
USE order_center;
DELIMITER //
CREATE PROCEDURE CreateOrderShards()
BEGIN
DECLARE i INT DEFAULT 0;
WHILE i < 10 DO
SET @sql = CONCAT('
CREATE TABLE orders_', i, ' (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
total_amount DECIMAL(10,2),
status TINYINT DEFAULT 1,
payment_status TINYINT DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
paid_at DATETIME,
INDEX idx_user_id_', i, ' (user_id),
INDEX idx_created_', i, ' (created_at),
INDEX idx_status_', i, ' (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- 创建对应的订单项分表
SET @sql = CONCAT('
CREATE TABLE order_items_', i, ' (
item_id BIGINT PRIMARY KEY,
order_id BIGINT NOT NULL,
product_id BIGINT,
product_name VARCHAR(100),
price DECIMAL(10,2),
quantity INT,
INDEX idx_order_id_', i, ' (order_id),
INDEX idx_product_id_', i, ' (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET i = i + 1;
END WHILE;
END//
DELIMITER ;
CALL CreateOrderShards();
二、程序中处理(PHP)
1. 基础配置类
<?php
// Config.php
class Config {
private static $instance;
private $config;
private function __construct() {
$this->config = [
'databases' => [
'user_center' => [
'host' => 'localhost',
'port' => 3306,
'username' => 'root',
'password' => 'password',
'dbname' => 'user_center'
],
'order_center' => [
'host' => 'localhost',
'port' => 3306,
'username' => 'root',
'password' => 'password',
'dbname' => 'order_center'
],
'product_center' => [
'host' => 'localhost',
'port' => 3306,
'username' => 'root',
'password' => 'password',
'dbname' => 'product_center'
]
],
'shard' => [
'user' => 10, // 用户表10个分片
'order' => 10 // 订单表10个分片
]
];
}
public static function getInstance() {
if (!self::$instance) {
self::$instance = new self();
}
return self::$instance;
}
public function get($key) {
$keys = explode('.', $key);
$value = $this->config;
foreach ($keys as $k) {
if (!isset($value[$k])) {
return null;
}
$value = $value[$k];
}
return $value;
}
}
2. 分片路由类
<?php
// ShardRouter.php
class ShardRouter {
private $shardConfig;
public function __construct() {
$config = Config::getInstance();
$this->shardConfig = $config->get('shard');
}
/**
* 根据用户ID计算分片
*/
public function getUserShard($userId) {
return $userId % $this->shardConfig['user'];
}
/**
* 根据订单ID计算分片(订单按用户ID分片)
*/
public function getOrderShard($orderId) {
// 假设订单ID包含用户ID信息
$userId = $this->extractUserIdFromOrderId($orderId);
return $userId % $this->shardConfig['order'];
}
/**
* 根据用户名计算分片(用于登录查询)
*/
public function getShardByString($str, $type = 'user') {
$shardCount = $this->shardConfig[$type];
return crc32($str) % $shardCount;
}
/**
* 生成分布式ID(雪花算法)
*/
public function generateId($shardId = 0, $timestamp = null) {
$timestamp = $timestamp ?? (int)(microtime(true) * 1000);
$workerId = $shardId % 1024;
$sequence = mt_rand(0, 4095);
return (($timestamp - 1577836800000) << 22)
| ($workerId << 12)
| $sequence;
}
/**
* 从订单ID提取用户ID(根据业务规则)
*/
private function extractUserIdFromOrderId($orderId) {
// 实际业务中需要根据ID生成规则来解析
// 这里简单返回订单ID的低位部分作为用户ID
return $orderId & 0x3FFFFFFF;
}
/**
* 获取表名
*/
public function getTableName($baseTable, $shardId) {
return $baseTable . '_' . $shardId;
}
}
3. 数据库连接管理
<?php
// DatabaseManager.php
class DatabaseManager {
private static $instance;
private $connections = [];
private $config;
private function __construct() {
$this->config = Config::getInstance();
}
public static function getInstance() {
if (!self::$instance) {
self::$instance = new self();
}
return self::$instance;
}
/**
* 获取数据库连接
*/
public function getConnection($database, $shardId = null) {
$key = $database . ($shardId !== null ? '_' . $shardId : '');
if (!isset($this->connections[$key])) {
$dbConfig = $this->config->get("databases.{$database}");
if (!$dbConfig) {
throw new Exception("Database config not found: {$database}");
}
try {
$dsn = "mysql:host={$dbConfig['host']};port={$dbConfig['port']};dbname={$dbConfig['dbname']};charset=utf8mb4";
$this->connections[$key] = new PDO(
$dsn,
$dbConfig['username'],
$dbConfig['password'],
[
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false,
PDO::ATTR_PERSISTENT => false
]
);
} catch (PDOException $e) {
throw new Exception("Database connection failed: " . $e->getMessage());
}
}
return $this->connections[$key];
}
/**
* 开始事务(单分片事务)
*/
public function beginTransaction($database, $shardId = null) {
$connection = $this->getConnection($database, $shardId);
return $connection->beginTransaction();
}
/**
* 提交事务
*/
public function commit($database, $shardId = null) {
$connection = $this->getConnection($database, $shardId);
return $connection->commit();
}
/**
* 回滚事务
*/
public function rollback($database, $shardId = null) {
$connection = $this->getConnection($database, $shardId);
return $connection->rollback();
}
}
4. 用户服务类示例(完整CRUD)
<?php
// UserService.php
class UserService {
private $db;
private $router;
public function __construct() {
$this->db = DatabaseManager::getInstance();
$this->router = new ShardRouter();
}
/**
* 创建用户(写入操作)
*/
public function createUser($userData) {
// 生成用户ID
$userId = $this->router->generateId();
$shardId = $this->router->getUserShard($userId);
$tableName = $this->router->getTableName('users', $shardId);
$connection = $this->db->getConnection('user_center', $shardId);
try {
// 开始事务
$this->db->beginTransaction('user_center', $shardId);
// 插入用户基础信息
$sql = "INSERT INTO {$tableName}
(user_id, username, email, password, mobile, created_at)
VALUES (?, ?, ?, ?, ?, NOW())";
$stmt = $connection->prepare($sql);
$stmt->execute([
$userId,
$userData['username'],
$userData['email'],
password_hash($userData['password'], PASSWORD_DEFAULT),
$userData['mobile'] ?? null
]);
// 插入用户档案(垂直分库,但在同一个事务中)
$this->createUserProfile($userId, $userData);
// 提交事务
$this->db->commit('user_center', $shardId);
return $userId;
} catch (Exception $e) {
$this->db->rollback('user_center', $shardId);
throw new Exception("创建用户失败: " . $e->getMessage());
}
}
/**
* 更新用户信息
*/
public function updateUser($userId, $updateData) {
$shardId = $this->router->getUserShard($userId);
$tableName = $this->router->getTableName('users', $shardId);
$connection = $this->db->getConnection('user_center', $shardId);
$allowedFields = ['email', 'mobile', 'password'];
$setParts = [];
$params = [];
foreach ($updateData as $field => $value) {
if (in_array($field, $allowedFields)) {
if ($field === 'password') {
$value = password_hash($value, PASSWORD_DEFAULT);
}
$setParts[] = "{$field} = ?";
$params[] = $value;
}
}
if (empty($setParts)) {
throw new Exception("没有有效的更新字段");
}
$params[] = $userId;
$sql = "UPDATE {$tableName} SET " . implode(', ', $setParts) . " WHERE user_id = ?";
$stmt = $connection->prepare($sql);
return $stmt->execute($params);
}
/**
* 根据用户ID查询(精确查询)
*/
public function getUserById($userId) {
$shardId = $this->router->getUserShard($userId);
$tableName = $this->router->getTableName('users', $shardId);
$connection = $this->db->getConnection('user_center', $shardId);
$sql = "SELECT * FROM {$tableName} WHERE user_id = ?";
$stmt = $connection->prepare($sql);
$stmt->execute([$userId]);
$user = $stmt->fetch();
if ($user) {
$user['profile'] = $this->getUserProfile($userId);
$user['addresses'] = $this->getUserAddresses($userId);
}
return $user;
}
/**
* 根据用户名查询(需要计算分片)
*/
public function getUserByUsername($username) {
$shardId = $this->router->getShardByString($username, 'user');
$tableName = $this->router->getTableName('users', $shardId);
$connection = $this->db->getConnection('user_center', $shardId);
$sql = "SELECT * FROM {$tableName} WHERE username = ?";
$stmt = $connection->prepare($sql);
$stmt->execute([$username]);
return $stmt->fetch();
}
/**
* 批量查询用户信息
*/
public function getUsersByIds($userIds) {
$usersByShard = [];
// 按分片分组用户ID
foreach ($userIds as $userId) {
$shardId = $this->router->getUserShard($userId);
$usersByShard[$shardId][] = $userId;
}
$results = [];
foreach ($usersByShard as $shardId => $shardUserIds) {
$tableName = $this->router->getTableName('users', $shardId);
$connection = $this->db->getConnection('user_center', $shardId);
$placeholders = str_repeat('?,', count($shardUserIds) - 1) . '?';
$sql = "SELECT * FROM {$tableName} WHERE user_id IN ({$placeholders})";
$stmt = $connection->prepare($sql);
$stmt->execute($shardUserIds);
$results = array_merge($results, $stmt->fetchAll());
}
return $results;
}
/**
* 复杂查询:按条件搜索用户(需要遍历分片)
*/
public function searchUsers($conditions, $page = 1, $pageSize = 20) {
$allUsers = [];
$shardCount = Config::getInstance()->get('shard.user');
for ($shardId = 0; $shardId < $shardCount; $shardId++) {
$tableName = $this->router->getTableName('users', $shardId);
$connection = $this->db->getConnection('user_center', $shardId);
$where = [];
$params = [];
if (!empty($conditions['username'])) {
$where[] = "username LIKE ?";
$params[] = $conditions['username'] . '%';
}
if (!empty($conditions['email'])) {
$where[] = "email LIKE ?";
$params[] = $conditions['email'] . '%';
}
if (!empty($conditions['mobile'])) {
$where[] = "mobile LIKE ?";
$params[] = $conditions['mobile'] . '%';
}
$whereClause = $where ? "WHERE " . implode(' AND ', $where) : "";
$offset = ($page - 1) * $pageSize;
$sql = "SELECT * FROM {$tableName} {$whereClause} LIMIT ? OFFSET ?";
$params[] = $pageSize;
$params[] = $offset;
$stmt = $connection->prepare($sql);
$stmt->execute($params);
$shardUsers = $stmt->fetchAll();
$allUsers = array_merge($allUsers, $shardUsers);
}
// 在PHP中进行排序和分页
usort($allUsers, function($a, $b) {
return strcmp($b['created_at'], $a['created_at']);
});
return array_slice($allUsers, 0, $pageSize);
}
/**
* 删除用户
*/
public function deleteUser($userId) {
$shardId = $this->router->getUserShard($userId);
$tableName = $this->router->getTableName('users', $shardId);
$connection = $this->db->getConnection('user_center', $shardId);
try {
$this->db->beginTransaction('user_center', $shardId);
// 删除用户基础信息
$sql = "DELETE FROM {$tableName} WHERE user_id = ?";
$stmt = $connection->prepare($sql);
$stmt->execute([$userId]);
// 删除关联数据
$this->deleteUserProfile($userId);
$this->deleteUserAddresses($userId);
$this->db->commit('user_center', $shardId);
return true;
} catch (Exception $e) {
$this->db->rollback('user_center', $shardId);
throw new Exception("删除用户失败: " . $e->getMessage());
}
}
// 用户档案相关方法
private function createUserProfile($userId, $userData) {
$connection = $this->db->getConnection('user_center');
$sql = "INSERT INTO user_profiles
(user_id, real_name, avatar, gender, birthday, bio)
VALUES (?, ?, ?, ?, ?, ?)";
$stmt = $connection->prepare($sql);
return $stmt->execute([
$userId,
$userData['real_name'] ?? '',
$userData['avatar'] ?? '',
$userData['gender'] ?? 0,
$userData['birthday'] ?? null,
$userData['bio'] ?? ''
]);
}
private function getUserProfile($userId) {
$connection = $this->db->getConnection('user_center');
$sql = "SELECT * FROM user_profiles WHERE user_id = ?";
$stmt = $connection->prepare($sql);
$stmt->execute([$userId]);
return $stmt->fetch();
}
private function getUserAddresses($userId) {
$connection = $this->db->getConnection('user_center');
$sql = "SELECT * FROM user_address WHERE user_id = ? ORDER BY is_default DESC";
$stmt = $connection->prepare($sql);
$stmt->execute([$userId]);
return $stmt->fetchAll();
}
private function deleteUserProfile($userId) {
$connection = $this->db->getConnection('user_center');
$sql = "DELETE FROM user_profiles WHERE user_id = ?";
$stmt = $connection->prepare($sql);
return $stmt->execute([$userId]);
}
private function deleteUserAddresses($userId) {
$connection = $this->db->getConnection('user_center');
$sql = "DELETE FROM user_address WHERE user_id = ?";
$stmt = $connection->prepare($sql);
return $stmt->execute([$userId]);
}
}
5. 订单服务类示例
<?php
// OrderService.php
class OrderService {
private $db;
private $router;
public function __construct() {
$this->db = DatabaseManager::getInstance();
$this->router = new ShardRouter();
}
/**
* 创建订单
*/
public function createOrder($orderData) {
$userId = $orderData['user_id'];
$shardId = $this->router->getUserShard($userId);
$orderId = $this->router->generateId($shardId);
$orderTable = $this->router->getTableName('orders', $shardId);
$itemTable = $this->router->getTableName('order_items', $shardId);
$connection = $this->db->getConnection('order_center', $shardId);
try {
$this->db->beginTransaction('order_center', $shardId);
// 插入订单主表
$sql = "INSERT INTO {$orderTable}
(order_id, user_id, total_amount, status, payment_status, created_at)
VALUES (?, ?, ?, 1, 0, NOW())";
$stmt = $connection->prepare($sql);
$stmt->execute([
$orderId,
$userId,
$orderData['total_amount']
]);
// 插入订单项
foreach ($orderData['items'] as $item) {
$itemId = $this->router->generateId($shardId);
$sql = "INSERT INTO {$itemTable}
(item_id, order_id, product_id, product_name, price, quantity)
VALUES (?, ?, ?, ?, ?, ?)";
$stmt = $connection->prepare($sql);
$stmt->execute([
$itemId,
$orderId,
$item['product_id'],
$item['product_name'],
$item['price'],
$item['quantity']
]);
}
$this->db->commit('order_center', $shardId);
return $orderId;
} catch (Exception $e) {
$this->db->rollback('order_center', $shardId);
throw new Exception("创建订单失败: " . $e->getMessage());
}
}
/**
* 更新订单状态
*/
public function updateOrderStatus($orderId, $status) {
$shardId = $this->router->getOrderShard($orderId);
$tableName = $this->router->getTableName('orders', $shardId);
$connection = $this->db->getConnection('order_center', $shardId);
$sql = "UPDATE {$tableName} SET status = ?, updated_at = NOW() WHERE order_id = ?";
$stmt = $connection->prepare($sql);
return $stmt->execute([$status, $orderId]);
}
/**
* 根据订单ID查询
*/
public function getOrderById($orderId) {
$shardId = $this->router->getOrderShard($orderId);
$orderTable = $this->router->getTableName('orders', $shardId);
$itemTable = $this->router->getTableName('order_items', $shardId);
$connection = $this->db->getConnection('order_center', $shardId);
// 查询订单基本信息
$sql = "SELECT * FROM {$orderTable} WHERE order_id = ?";
$stmt = $connection->prepare($sql);
$stmt->execute([$orderId]);
$order = $stmt->fetch();
if ($order) {
// 查询订单项
$sql = "SELECT * FROM {$itemTable} WHERE order_id = ?";
$stmt = $connection->prepare($sql);
$stmt->execute([$orderId]);
$order['items'] = $stmt->fetchAll();
}
return $order;
}
/**
* 查询用户订单列表
*/
public function getUserOrders($userId, $page = 1, $pageSize = 20) {
$shardId = $this->router->getUserShard($userId);
$tableName = $this->router->getTableName('orders', $shardId);
$connection = $this->db->getConnection('order_center', $shardId);
$offset = ($page - 1) * $pageSize;
$sql = "SELECT * FROM {$tableName}
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?";
$stmt = $connection->prepare($sql);
$stmt->execute([$userId, $pageSize, $offset]);
return $stmt->fetchAll();
}
}
6. 使用示例
6.1 用户注册和订单创建
<?php
// register_and_order.php
require_once 'Config.php';
require_once 'ShardRouter.php';
require_once 'DatabaseManager.php';
require_once 'UserService.php';
require_once 'OrderService.php';
try {
// 初始化服务
$userService = new UserService();
$orderService = new OrderService();
// 1. 创建用户
$userData = [
'username' => 'zhangsan',
'email' => 'zhangsan@example.com',
'password' => '123456',
'mobile' => '13800138000',
'real_name' => '张三',
'avatar' => 'avatar.jpg'
];
$userId = $userService->createUser($userData);
echo "用户创建成功,ID: {$userId}\n";
// 2. 为用户创建订单
$orderData = [
'user_id' => $userId,
'total_amount' => 299.00,
'items' => [
[
'product_id' => 1001,
'product_name' => 'iPhone 14',
'price' => 299.00,
'quantity' => 1
]
]
];
$orderId = $orderService->createOrder($orderData);
echo "订单创建成功,ID: {$orderId}\n";
// 3. 查询用户信息
$user = $userService->getUserById($userId);
echo "用户信息: " . json_encode($user, JSON_UNESCAPED_UNICODE) . "\n";
// 4. 查询订单信息
$order = $orderService->getOrderById($orderId);
echo "订单信息: " . json_encode($order, JSON_UNESCAPED_UNICODE) . "\n";
// 5. 查询用户订单列表
$orders = $orderService->getUserOrders($userId);
echo "用户订单数量: " . count($orders) . "\n";
} catch (Exception $e) {
echo "操作失败: " . $e->getMessage() . "\n";
}
6.2 批量操作示例
<?php
// batch_operations.php
require_once 'UserService.php';
$userService = new UserService();
// 批量查询用户信息
$userIds = [123456, 234567, 345678, 456789];
$users = $userService->getUsersByIds($userIds);
echo "批量查询到 " . count($users) . " 个用户\n";
// 复杂搜索
$conditions = [
'username' => 'zhang',
'mobile' => '138'
];
$users = $userService->searchUsers($conditions, 1, 10);
echo "搜索到 " . count($users) . " 个用户\n";
// 更新用户信息
$updateResult = $userService->updateUser(123456, [
'email' => 'new_email@example.com',
'mobile' => '13900139000'
]);
echo "更新结果: " . ($updateResult ? '成功' : '失败') . "\n";
三、 总结
写入操作:
先计算分片:根据分片键(如用户ID)计算目标分片
获取对应连接:连接到正确的数据库和分表
执行写入:在目标分片上执行INSERT/UPDATE
事务管理:同一分片内的事务可以保证一致性
查询操作:
精确查询:通过分片键直接定位到具体分片
批量查询:按分片分组,分别查询后合并结果
复杂查询:遍历所有分片,在PHP层进行结果聚合
跨库查询:避免跨分片JOIN,通过多次查询在应用层组装
更新操作:
定位分片:通过分片键找到数据所在分片
单分片更新:在单个分片内完成更新
批量更新:按分片分组执行
删除操作:
定位分片:找到数据所在分片
事务处理:删除主表和关联表数据
级联删除:在应用层处理关联数据删除
说明
这种方案虽然增加了应用层的复杂性,但避免了中间件的依赖,提供了更大的灵活性。
在实际应用中,还需要考虑连接池、缓存、监控等配套措施。