MySQL分库分表(无中间件)

一、 分库分表方案设计

1. 垂直拆分 (按业务模块)

-- 原始单体数据库
CREATE DATABASE ecommerce;
USE ecommerce;

-- 垂直拆分后的数据库
-- 用户库
CREATE DATABASE user_center;
USE user_center;

CREATE TABLE users (
    user_id BIGINT PRIMARY KEY,
    username VARCHAR(50) UNIQUE,
    email VARCHAR(100) UNIQUE,
    password VARCHAR(100),
    mobile VARCHAR(20),
    status TINYINT DEFAULT 1,
    created_at DATETIME,
    updated_at DATETIME
);

CREATE TABLE user_profiles (
    user_id BIGINT PRIMARY KEY,
    real_name VARCHAR(50),
    avatar VARCHAR(200),
    gender TINYINT,
    birthday DATE,
    bio TEXT
);

CREATE TABLE user_address (
    address_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    province VARCHAR(50),
    city VARCHAR(50),
    district VARCHAR(50),
    detail VARCHAR(200),
    is_default TINYINT DEFAULT 0,
    INDEX idx_user_id (user_id)
);

-- 订单库
CREATE DATABASE order_center;
USE order_center;

CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    total_amount DECIMAL(10,2),
    status TINYINT,
    payment_status TINYINT,
    created_at DATETIME,
    paid_at DATETIME
);

CREATE TABLE order_items (
    item_id BIGINT PRIMARY KEY,
    order_id BIGINT,
    product_id BIGINT,
    product_name VARCHAR(100),
    price DECIMAL(10,2),
    quantity INT,
    INDEX idx_order_id (order_id)
);

-- 商品库
CREATE DATABASE product_center;
USE product_center;

CREATE TABLE products (
    product_id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    category_id INT,
    price DECIMAL(10,2),
    stock INT,
    status TINYINT,
    created_at DATETIME
);

CREATE TABLE categories (
    category_id INT PRIMARY KEY,
    name VARCHAR(50),
    parent_id INT
);

2. 水平拆分 (按数据量)

用户表水平分表(10个分表)

-- 在 user_center 数据库中创建分表
USE user_center;

-- 下面语句中的//为分隔符
DELIMITER //
CREATE PROCEDURE CreateUserShards()
BEGIN
    DECLARE i INT DEFAULT 0;
    WHILE i < 10 DO
        SET @sql = CONCAT('
            CREATE TABLE users_', i, ' (
                user_id BIGINT PRIMARY KEY,
                username VARCHAR(50) NOT NULL,
                email VARCHAR(100),
                password VARCHAR(100),
                mobile VARCHAR(20),
                status TINYINT DEFAULT 1,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uk_username_', i, ' (username),
                UNIQUE KEY uk_email_', i, ' (email),
                INDEX idx_mobile_', i, ' (mobile),
                INDEX idx_created_', i, ' (created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ');
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;
        SET i = i + 1;
    END WHILE;
END//
DELIMITER ;

CALL CreateUserShards();

订单表水平分表(按用户ID分表)

USE order_center;

DELIMITER //
CREATE PROCEDURE CreateOrderShards()
BEGIN
    DECLARE i INT DEFAULT 0;
    WHILE i < 10 DO
        SET @sql = CONCAT('
            CREATE TABLE orders_', i, ' (
                order_id BIGINT PRIMARY KEY,
                user_id BIGINT NOT NULL,
                total_amount DECIMAL(10,2),
                status TINYINT DEFAULT 1,
                payment_status TINYINT DEFAULT 0,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
                paid_at DATETIME,
                INDEX idx_user_id_', i, ' (user_id),
                INDEX idx_created_', i, ' (created_at),
                INDEX idx_status_', i, ' (status)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ');
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;
        
        -- 创建对应的订单项分表
        SET @sql = CONCAT('
            CREATE TABLE order_items_', i, ' (
                item_id BIGINT PRIMARY KEY,
                order_id BIGINT NOT NULL,
                product_id BIGINT,
                product_name VARCHAR(100),
                price DECIMAL(10,2),
                quantity INT,
                INDEX idx_order_id_', i, ' (order_id),
                INDEX idx_product_id_', i, ' (product_id)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ');
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;
        
        SET i = i + 1;
    END WHILE;
END//
DELIMITER ;

CALL CreateOrderShards();

二、程序中处理(PHP)

1. 基础配置类

<?php
// Config.php
class Config {
    private static $instance;
    private $config;
    
    private function __construct() {
        $this->config = [
            'databases' => [
                'user_center' => [
                    'host' => 'localhost',
                    'port' => 3306,
                    'username' => 'root',
                    'password' => 'password',
                    'dbname' => 'user_center'
                ],
                'order_center' => [
                    'host' => 'localhost',
                    'port' => 3306,
                    'username' => 'root',
                    'password' => 'password',
                    'dbname' => 'order_center'
                ],
                'product_center' => [
                    'host' => 'localhost',
                    'port' => 3306,
                    'username' => 'root',
                    'password' => 'password',
                    'dbname' => 'product_center'
                ]
            ],
            'shard' => [
                'user' => 10,     // 用户表10个分片
                'order' => 10     // 订单表10个分片
            ]
        ];
    }
    
    public static function getInstance() {
        if (!self::$instance) {
            self::$instance = new self();
        }
        return self::$instance;
    }
    
    public function get($key) {
        $keys = explode('.', $key);
        $value = $this->config;
        
        foreach ($keys as $k) {
            if (!isset($value[$k])) {
                return null;
            }
            $value = $value[$k];
        }
        
        return $value;
    }
}

2. 分片路由类

<?php
// ShardRouter.php
class ShardRouter {
    private $shardConfig;
    
    public function __construct() {
        $config = Config::getInstance();
        $this->shardConfig = $config->get('shard');
    }
    
    /**
     * 根据用户ID计算分片
     */
    public function getUserShard($userId) {
        return $userId % $this->shardConfig['user'];
    }
    
    /**
     * 根据订单ID计算分片(订单按用户ID分片)
     */
    public function getOrderShard($orderId) {
        // 假设订单ID包含用户ID信息
        $userId = $this->extractUserIdFromOrderId($orderId);
        return $userId % $this->shardConfig['order'];
    }
    
    /**
     * 根据用户名计算分片(用于登录查询)
     */
    public function getShardByString($str, $type = 'user') {
        $shardCount = $this->shardConfig[$type];
        return crc32($str) % $shardCount;
    }
    
    /**
     * 生成分布式ID(雪花算法)
     */
    public function generateId($shardId = 0, $timestamp = null) {
        $timestamp = $timestamp ?? (int)(microtime(true) * 1000);
        $workerId = $shardId % 1024;
        $sequence = mt_rand(0, 4095);
        
        return (($timestamp - 1577836800000) << 22) 
             | ($workerId << 12) 
             | $sequence;
    }
    
    /**
     * 从订单ID提取用户ID(根据业务规则)
     */
    private function extractUserIdFromOrderId($orderId) {
        // 实际业务中需要根据ID生成规则来解析
        // 这里简单返回订单ID的低位部分作为用户ID
        return $orderId & 0x3FFFFFFF;
    }
    
    /**
     * 获取表名
     */
    public function getTableName($baseTable, $shardId) {
        return $baseTable . '_' . $shardId;
    }
}

3. 数据库连接管理

<?php
// DatabaseManager.php
class DatabaseManager {
    private static $instance;
    private $connections = [];
    private $config;
    
    private function __construct() {
        $this->config = Config::getInstance();
    }
    
    public static function getInstance() {
        if (!self::$instance) {
            self::$instance = new self();
        }
        return self::$instance;
    }
    
    /**
     * 获取数据库连接
     */
    public function getConnection($database, $shardId = null) {
        $key = $database . ($shardId !== null ? '_' . $shardId : '');
        
        if (!isset($this->connections[$key])) {
            $dbConfig = $this->config->get("databases.{$database}");
            
            if (!$dbConfig) {
                throw new Exception("Database config not found: {$database}");
            }
            
            try {
                $dsn = "mysql:host={$dbConfig['host']};port={$dbConfig['port']};dbname={$dbConfig['dbname']};charset=utf8mb4";
                
                $this->connections[$key] = new PDO(
                    $dsn,
                    $dbConfig['username'],
                    $dbConfig['password'],
                    [
                        PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
                        PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
                        PDO::ATTR_EMULATE_PREPARES => false,
                        PDO::ATTR_PERSISTENT => false
                    ]
                );
            } catch (PDOException $e) {
                throw new Exception("Database connection failed: " . $e->getMessage());
            }
        }
        
        return $this->connections[$key];
    }
    
    /**
     * 开始事务(单分片事务)
     */
    public function beginTransaction($database, $shardId = null) {
        $connection = $this->getConnection($database, $shardId);
        return $connection->beginTransaction();
    }
    
    /**
     * 提交事务
     */
    public function commit($database, $shardId = null) {
        $connection = $this->getConnection($database, $shardId);
        return $connection->commit();
    }
    
    /**
     * 回滚事务
     */
    public function rollback($database, $shardId = null) {
        $connection = $this->getConnection($database, $shardId);
        return $connection->rollback();
    }
}

4. 用户服务类示例(完整CRUD)

<?php
// UserService.php
class UserService {
    private $db;
    private $router;
    
    public function __construct() {
        $this->db = DatabaseManager::getInstance();
        $this->router = new ShardRouter();
    }
    
    /**
     * 创建用户(写入操作)
     */
    public function createUser($userData) {
        // 生成用户ID
        $userId = $this->router->generateId();
        $shardId = $this->router->getUserShard($userId);
        
        $tableName = $this->router->getTableName('users', $shardId);
        $connection = $this->db->getConnection('user_center', $shardId);
        
        try {
            // 开始事务
            $this->db->beginTransaction('user_center', $shardId);
            
            // 插入用户基础信息
            $sql = "INSERT INTO {$tableName} 
                    (user_id, username, email, password, mobile, created_at) 
                    VALUES (?, ?, ?, ?, ?, NOW())";
            
            $stmt = $connection->prepare($sql);
            $stmt->execute([
                $userId,
                $userData['username'],
                $userData['email'],
                password_hash($userData['password'], PASSWORD_DEFAULT),
                $userData['mobile'] ?? null
            ]);
            
            // 插入用户档案(垂直分库,但在同一个事务中)
            $this->createUserProfile($userId, $userData);
            
            // 提交事务
            $this->db->commit('user_center', $shardId);
            
            return $userId;
            
        } catch (Exception $e) {
            $this->db->rollback('user_center', $shardId);
            throw new Exception("创建用户失败: " . $e->getMessage());
        }
    }
    
    /**
     * 更新用户信息
     */
    public function updateUser($userId, $updateData) {
        $shardId = $this->router->getUserShard($userId);
        $tableName = $this->router->getTableName('users', $shardId);
        $connection = $this->db->getConnection('user_center', $shardId);
        
        $allowedFields = ['email', 'mobile', 'password'];
        $setParts = [];
        $params = [];
        
        foreach ($updateData as $field => $value) {
            if (in_array($field, $allowedFields)) {
                if ($field === 'password') {
                    $value = password_hash($value, PASSWORD_DEFAULT);
                }
                $setParts[] = "{$field} = ?";
                $params[] = $value;
            }
        }
        
        if (empty($setParts)) {
            throw new Exception("没有有效的更新字段");
        }
        
        $params[] = $userId;
        
        $sql = "UPDATE {$tableName} SET " . implode(', ', $setParts) . " WHERE user_id = ?";
        $stmt = $connection->prepare($sql);
        
        return $stmt->execute($params);
    }
    
    /**
     * 根据用户ID查询(精确查询)
     */
    public function getUserById($userId) {
        $shardId = $this->router->getUserShard($userId);
        $tableName = $this->router->getTableName('users', $shardId);
        $connection = $this->db->getConnection('user_center', $shardId);
        
        $sql = "SELECT * FROM {$tableName} WHERE user_id = ?";
        $stmt = $connection->prepare($sql);
        $stmt->execute([$userId]);
        
        $user = $stmt->fetch();
        if ($user) {
            $user['profile'] = $this->getUserProfile($userId);
            $user['addresses'] = $this->getUserAddresses($userId);
        }
        
        return $user;
    }
    
    /**
     * 根据用户名查询(需要计算分片)
     */
    public function getUserByUsername($username) {
        $shardId = $this->router->getShardByString($username, 'user');
        $tableName = $this->router->getTableName('users', $shardId);
        $connection = $this->db->getConnection('user_center', $shardId);
        
        $sql = "SELECT * FROM {$tableName} WHERE username = ?";
        $stmt = $connection->prepare($sql);
        $stmt->execute([$username]);
        
        return $stmt->fetch();
    }
    
    /**
     * 批量查询用户信息
     */
    public function getUsersByIds($userIds) {
        $usersByShard = [];
        
        // 按分片分组用户ID
        foreach ($userIds as $userId) {
            $shardId = $this->router->getUserShard($userId);
            $usersByShard[$shardId][] = $userId;
        }
        
        $results = [];
        foreach ($usersByShard as $shardId => $shardUserIds) {
            $tableName = $this->router->getTableName('users', $shardId);
            $connection = $this->db->getConnection('user_center', $shardId);
            
            $placeholders = str_repeat('?,', count($shardUserIds) - 1) . '?';
            $sql = "SELECT * FROM {$tableName} WHERE user_id IN ({$placeholders})";
            
            $stmt = $connection->prepare($sql);
            $stmt->execute($shardUserIds);
            
            $results = array_merge($results, $stmt->fetchAll());
        }
        
        return $results;
    }
    
    /**
     * 复杂查询:按条件搜索用户(需要遍历分片)
     */
    public function searchUsers($conditions, $page = 1, $pageSize = 20) {
        $allUsers = [];
        $shardCount = Config::getInstance()->get('shard.user');
        
        for ($shardId = 0; $shardId < $shardCount; $shardId++) {
            $tableName = $this->router->getTableName('users', $shardId);
            $connection = $this->db->getConnection('user_center', $shardId);
            
            $where = [];
            $params = [];
            
            if (!empty($conditions['username'])) {
                $where[] = "username LIKE ?";
                $params[] = $conditions['username'] . '%';
            }
            
            if (!empty($conditions['email'])) {
                $where[] = "email LIKE ?";
                $params[] = $conditions['email'] . '%';
            }
            
            if (!empty($conditions['mobile'])) {
                $where[] = "mobile LIKE ?";
                $params[] = $conditions['mobile'] . '%';
            }
            
            $whereClause = $where ? "WHERE " . implode(' AND ', $where) : "";
            $offset = ($page - 1) * $pageSize;
            
            $sql = "SELECT * FROM {$tableName} {$whereClause} LIMIT ? OFFSET ?";
            $params[] = $pageSize;
            $params[] = $offset;
            
            $stmt = $connection->prepare($sql);
            $stmt->execute($params);
            
            $shardUsers = $stmt->fetchAll();
            $allUsers = array_merge($allUsers, $shardUsers);
        }
        
        // 在PHP中进行排序和分页
        usort($allUsers, function($a, $b) {
            return strcmp($b['created_at'], $a['created_at']);
        });
        
        return array_slice($allUsers, 0, $pageSize);
    }
    
    /**
     * 删除用户
     */
    public function deleteUser($userId) {
        $shardId = $this->router->getUserShard($userId);
        $tableName = $this->router->getTableName('users', $shardId);
        $connection = $this->db->getConnection('user_center', $shardId);
        
        try {
            $this->db->beginTransaction('user_center', $shardId);
            
            // 删除用户基础信息
            $sql = "DELETE FROM {$tableName} WHERE user_id = ?";
            $stmt = $connection->prepare($sql);
            $stmt->execute([$userId]);
            
            // 删除关联数据
            $this->deleteUserProfile($userId);
            $this->deleteUserAddresses($userId);
            
            $this->db->commit('user_center', $shardId);
            return true;
            
        } catch (Exception $e) {
            $this->db->rollback('user_center', $shardId);
            throw new Exception("删除用户失败: " . $e->getMessage());
        }
    }
    
    // 用户档案相关方法
    private function createUserProfile($userId, $userData) {
        $connection = $this->db->getConnection('user_center');
        
        $sql = "INSERT INTO user_profiles 
                (user_id, real_name, avatar, gender, birthday, bio) 
                VALUES (?, ?, ?, ?, ?, ?)";
        
        $stmt = $connection->prepare($sql);
        return $stmt->execute([
            $userId,
            $userData['real_name'] ?? '',
            $userData['avatar'] ?? '',
            $userData['gender'] ?? 0,
            $userData['birthday'] ?? null,
            $userData['bio'] ?? ''
        ]);
    }
    
    private function getUserProfile($userId) {
        $connection = $this->db->getConnection('user_center');
        
        $sql = "SELECT * FROM user_profiles WHERE user_id = ?";
        $stmt = $connection->prepare($sql);
        $stmt->execute([$userId]);
        
        return $stmt->fetch();
    }
    
    private function getUserAddresses($userId) {
        $connection = $this->db->getConnection('user_center');
        
        $sql = "SELECT * FROM user_address WHERE user_id = ? ORDER BY is_default DESC";
        $stmt = $connection->prepare($sql);
        $stmt->execute([$userId]);
        
        return $stmt->fetchAll();
    }
    
    private function deleteUserProfile($userId) {
        $connection = $this->db->getConnection('user_center');
        
        $sql = "DELETE FROM user_profiles WHERE user_id = ?";
        $stmt = $connection->prepare($sql);
        return $stmt->execute([$userId]);
    }
    
    private function deleteUserAddresses($userId) {
        $connection = $this->db->getConnection('user_center');
        
        $sql = "DELETE FROM user_address WHERE user_id = ?";
        $stmt = $connection->prepare($sql);
        return $stmt->execute([$userId]);
    }
}

5. 订单服务类示例

<?php
// OrderService.php
class OrderService {
    private $db;
    private $router;
    
    public function __construct() {
        $this->db = DatabaseManager::getInstance();
        $this->router = new ShardRouter();
    }
    
    /**
     * 创建订单
     */
    public function createOrder($orderData) {
        $userId = $orderData['user_id'];
        $shardId = $this->router->getUserShard($userId);
        
        $orderId = $this->router->generateId($shardId);
        $orderTable = $this->router->getTableName('orders', $shardId);
        $itemTable = $this->router->getTableName('order_items', $shardId);
        $connection = $this->db->getConnection('order_center', $shardId);
        
        try {
            $this->db->beginTransaction('order_center', $shardId);
            
            // 插入订单主表
            $sql = "INSERT INTO {$orderTable} 
                    (order_id, user_id, total_amount, status, payment_status, created_at) 
                    VALUES (?, ?, ?, 1, 0, NOW())";
            
            $stmt = $connection->prepare($sql);
            $stmt->execute([
                $orderId,
                $userId,
                $orderData['total_amount']
            ]);
            
            // 插入订单项
            foreach ($orderData['items'] as $item) {
                $itemId = $this->router->generateId($shardId);
                
                $sql = "INSERT INTO {$itemTable} 
                        (item_id, order_id, product_id, product_name, price, quantity) 
                        VALUES (?, ?, ?, ?, ?, ?)";
                
                $stmt = $connection->prepare($sql);
                $stmt->execute([
                    $itemId,
                    $orderId,
                    $item['product_id'],
                    $item['product_name'],
                    $item['price'],
                    $item['quantity']
                ]);
            }
            
            $this->db->commit('order_center', $shardId);
            return $orderId;
            
        } catch (Exception $e) {
            $this->db->rollback('order_center', $shardId);
            throw new Exception("创建订单失败: " . $e->getMessage());
        }
    }
    
    /**
     * 更新订单状态
     */
    public function updateOrderStatus($orderId, $status) {
        $shardId = $this->router->getOrderShard($orderId);
        $tableName = $this->router->getTableName('orders', $shardId);
        $connection = $this->db->getConnection('order_center', $shardId);
        
        $sql = "UPDATE {$tableName} SET status = ?, updated_at = NOW() WHERE order_id = ?";
        $stmt = $connection->prepare($sql);
        
        return $stmt->execute([$status, $orderId]);
    }
    
    /**
     * 根据订单ID查询
     */
    public function getOrderById($orderId) {
        $shardId = $this->router->getOrderShard($orderId);
        $orderTable = $this->router->getTableName('orders', $shardId);
        $itemTable = $this->router->getTableName('order_items', $shardId);
        $connection = $this->db->getConnection('order_center', $shardId);
        
        // 查询订单基本信息
        $sql = "SELECT * FROM {$orderTable} WHERE order_id = ?";
        $stmt = $connection->prepare($sql);
        $stmt->execute([$orderId]);
        
        $order = $stmt->fetch();
        if ($order) {
            // 查询订单项
            $sql = "SELECT * FROM {$itemTable} WHERE order_id = ?";
            $stmt = $connection->prepare($sql);
            $stmt->execute([$orderId]);
            $order['items'] = $stmt->fetchAll();
        }
        
        return $order;
    }
    
    /**
     * 查询用户订单列表
     */
    public function getUserOrders($userId, $page = 1, $pageSize = 20) {
        $shardId = $this->router->getUserShard($userId);
        $tableName = $this->router->getTableName('orders', $shardId);
        $connection = $this->db->getConnection('order_center', $shardId);
        
        $offset = ($page - 1) * $pageSize;
        
        $sql = "SELECT * FROM {$tableName} 
                WHERE user_id = ? 
                ORDER BY created_at DESC 
                LIMIT ? OFFSET ?";
        
        $stmt = $connection->prepare($sql);
        $stmt->execute([$userId, $pageSize, $offset]);
        
        return $stmt->fetchAll();
    }
}

6. 使用示例

6.1 用户注册和订单创建

<?php
// register_and_order.php
require_once 'Config.php';
require_once 'ShardRouter.php';
require_once 'DatabaseManager.php';
require_once 'UserService.php';
require_once 'OrderService.php';

try {
    // 初始化服务
    $userService = new UserService();
    $orderService = new OrderService();
    
    // 1. 创建用户
    $userData = [
        'username' => 'zhangsan',
        'email' => 'zhangsan@example.com',
        'password' => '123456',
        'mobile' => '13800138000',
        'real_name' => '张三',
        'avatar' => 'avatar.jpg'
    ];
    
    $userId = $userService->createUser($userData);
    echo "用户创建成功,ID: {$userId}\n";
    
    // 2. 为用户创建订单
    $orderData = [
        'user_id' => $userId,
        'total_amount' => 299.00,
        'items' => [
            [
                'product_id' => 1001,
                'product_name' => 'iPhone 14',
                'price' => 299.00,
                'quantity' => 1
            ]
        ]
    ];
    
    $orderId = $orderService->createOrder($orderData);
    echo "订单创建成功,ID: {$orderId}\n";
    
    // 3. 查询用户信息
    $user = $userService->getUserById($userId);
    echo "用户信息: " . json_encode($user, JSON_UNESCAPED_UNICODE) . "\n";
    
    // 4. 查询订单信息
    $order = $orderService->getOrderById($orderId);
    echo "订单信息: " . json_encode($order, JSON_UNESCAPED_UNICODE) . "\n";
    
    // 5. 查询用户订单列表
    $orders = $orderService->getUserOrders($userId);
    echo "用户订单数量: " . count($orders) . "\n";
    
} catch (Exception $e) {
    echo "操作失败: " . $e->getMessage() . "\n";
}

6.2 批量操作示例

<?php
// batch_operations.php
require_once 'UserService.php';

$userService = new UserService();

// 批量查询用户信息
$userIds = [123456, 234567, 345678, 456789];
$users = $userService->getUsersByIds($userIds);
echo "批量查询到 " . count($users) . " 个用户\n";

// 复杂搜索
$conditions = [
    'username' => 'zhang',
    'mobile' => '138'
];
$users = $userService->searchUsers($conditions, 1, 10);
echo "搜索到 " . count($users) . " 个用户\n";

// 更新用户信息
$updateResult = $userService->updateUser(123456, [
    'email' => 'new_email@example.com',
    'mobile' => '13900139000'
]);
echo "更新结果: " . ($updateResult ? '成功' : '失败') . "\n";

三、 总结

写入操作:

  • 先计算分片:根据分片键(如用户ID)计算目标分片

  • 获取对应连接:连接到正确的数据库和分表

  • 执行写入:在目标分片上执行INSERT/UPDATE

  • 事务管理:同一分片内的事务可以保证一致性

查询操作:

  • 精确查询:通过分片键直接定位到具体分片

  • 批量查询:按分片分组,分别查询后合并结果

  • 复杂查询:遍历所有分片,在PHP层进行结果聚合

  • 跨库查询:避免跨分片JOIN,通过多次查询在应用层组装

更新操作:

  • 定位分片:通过分片键找到数据所在分片

  • 单分片更新:在单个分片内完成更新

  • 批量更新:按分片分组执行

删除操作:

  • 定位分片:找到数据所在分片

  • 事务处理:删除主表和关联表数据

  • 级联删除:在应用层处理关联数据删除

说明

这种方案虽然增加了应用层的复杂性,但避免了中间件的依赖,提供了更大的灵活性。

在实际应用中,还需要考虑连接池、缓存、监控等配套措施。