跳转至

asmysql v2 使用示例

本文档提供 asmysql v2 版本的详细使用示例。

导航

基础示例

最简单的查询

import asyncio
from asmysql import Engine

async def main():
    # 创建引擎
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")

    # 连接
    await engine.connect()

    # 执行查询
    result = await engine.execute("SELECT 1 as value")
    data = await result.fetch_one()
    print(data)  # (1,)

    # 断开连接
    await engine.disconnect()

asyncio.run(main())

使用上下文管理器

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")

    async with engine:
        result = await engine.execute("SELECT 1 as value")
        data = await result.fetch_one()
        print(data)

asyncio.run(main())

连接管理

检查连接状态

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")

    print(f"连接前: {engine.is_connected}")  # False

    await engine.connect()
    print(f"连接后: {engine.is_connected}")  # True

    # 查看连接池状态
    status = engine.status
    print(f"连接地址: {status['address']}")
    print(f"连接池大小: {status['pool_size']}")
    print(f"空闲连接: {status['pool_free']}")
    print(f"使用中连接: {status['pool_used']}")

    await engine.disconnect()

asyncio.run(main())

连接池配置

from asmysql import Engine

# 配置连接池
engine = Engine(
    url="mysql://root:pass@127.0.0.1:3306/test_db",
    min_pool_size=5,      # 最小连接数
    max_pool_size=50,     # 最大连接数
    pool_recycle=3600,    # 1 小时回收空闲连接
    connect_timeout=10    # 连接超时 10 秒
)

查询操作

单条记录查询

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 使用 tuple 类型(默认)
    result = await engine.execute(
        "SELECT id, name, email FROM users WHERE id = %s",
        (1,)
    )
    user = await result.fetch_one()
    if user:
        print(f"ID: {user[0]}, 姓名: {user[1]}, 邮箱: {user[2]}")

    # 使用 dict 类型
    result = await engine.execute(
        "SELECT id, name, email FROM users WHERE id = %s",
        (1,),
        result_class=dict
    )
    user = await result.fetch_one()
    if user:
        print(f"ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}")

    await engine.disconnect()

asyncio.run(main())

多条记录查询

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 获取所有记录
    result = await engine.execute("SELECT * FROM users")
    all_users = await result.fetch_all()
    print(f"共 {len(all_users)} 条记录")

    # 获取指定数量的记录
    result = await engine.execute("SELECT * FROM users")
    users = await result.fetch_many(10)  # 获取 10 条
    print(f"获取了 {len(users)} 条记录")

    await engine.disconnect()

asyncio.run(main())

使用字典参数

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 使用字典参数
    result = await engine.execute(
        "SELECT * FROM users WHERE name = %(name)s AND age > %(age)s",
        {"name": "张三", "age": 18}
    )
    users = await result.fetch_all()

    await engine.disconnect()

asyncio.run(main())

使用迭代器

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 方式一:使用 iterate() 方法
    result = await engine.execute("SELECT * FROM users")
    async for user in result.iterate():
        print(user)

    # 方式二:直接迭代 result
    result = await engine.execute("SELECT * FROM users")
    async for user in result:
        print(user)

    # 方式三:直接迭代 execute 返回的结果
    async for user in engine.execute("SELECT * FROM users"):
        print(user)

    await engine.disconnect()

asyncio.run(main())

插入操作

单条插入

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 插入数据
    result = await engine.execute(
        "INSERT INTO users (name, email) VALUES (%s, %s)",
        ("张三", "zhangsan@example.com")
    )

    if result.error:
        print(f"插入失败: {result.error_msg}")
    else:
        print(f"插入成功,受影响行数: {result.row_count}")
        print(f"新插入的 ID: {result.last_rowid}")

    await engine.disconnect()

asyncio.run(main())

获取插入的 ID

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    result = await engine.execute(
        "INSERT INTO users (name, email) VALUES (%s, %s)",
        ("李四", "lisi@example.com")
    )

    if not result.error:
        new_id = result.last_rowid
        print(f"新用户的 ID: {new_id}")

    await engine.disconnect()

asyncio.run(main())

更新操作

更新单条记录

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    result = await engine.execute(
        "UPDATE users SET email = %s WHERE id = %s",
        ("newemail@example.com", 1)
    )

    if result.error:
        print(f"更新失败: {result.error_msg}")
    else:
        print(f"更新成功,受影响行数: {result.row_count}")

    await engine.disconnect()

asyncio.run(main())

批量更新

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 使用 execute_many 批量更新
    updates = [
        ("email1@example.com", 1),
        ("email2@example.com", 2),
        ("email3@example.com", 3)
    ]

    result = await engine.execute_many(
        "UPDATE users SET email = %s WHERE id = %s",
        updates
    )

    if not result.error:
        print(f"批量更新成功,受影响行数: {result.row_count}")

    await engine.disconnect()

asyncio.run(main())

删除操作

删除记录

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    result = await engine.execute(
        "DELETE FROM users WHERE id = %s",
        (1,)
    )

    if result.error:
        print(f"删除失败: {result.error_msg}")
    else:
        print(f"删除成功,受影响行数: {result.row_count}")

    await engine.disconnect()

asyncio.run(main())

事务处理

基本事务

import asyncio
from asmysql import Engine

async def main():
    # 关闭自动提交
    engine = Engine(
        url="mysql://root:pass@127.0.0.1:3306/test_db",
        auto_commit=False
    )
    await engine.connect()

    try:
        # 插入用户
        result1 = await engine.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            ("张三", "zhangsan@example.com"),
            commit=False
        )
        if result1.error:
            raise Exception(result1.error_msg)

        user_id = result1.last_rowid

        # 插入用户详情
        result2 = await engine.execute(
            "INSERT INTO user_profiles (user_id, bio) VALUES (%s, %s)",
            (user_id, "这是张三的个人简介"),
            commit=False
        )
        if result2.error:
            raise Exception(result2.error_msg)

        # 手动提交(需要访问连接对象)
        # 注意:v2 版本的事务管理需要手动控制
        # 这里简化示例,实际使用时需要更精细的控制

    except Exception as e:
        print(f"事务失败: {e}")
        # 回滚操作需要手动处理

    await engine.disconnect()

asyncio.run(main())

流式查询

处理大数据集

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 启用流式查询
    result = await engine.execute(
        "SELECT * FROM large_table",
        stream=True
    )

    count = 0
    async for row in result:
        count += 1
        # 处理每一行数据,不占用内存
        process_row(row)

        if count % 1000 == 0:
            print(f"已处理 {count} 条记录")

    print(f"总共处理了 {count} 条记录")

    await engine.disconnect()

def process_row(row):
    """处理单行数据"""
    pass

asyncio.run(main())

流式查询与普通查询对比

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 普通查询:所有数据加载到内存
    result1 = await engine.execute("SELECT * FROM large_table", stream=False)
    all_data = await result1.fetch_all()  # 可能内存溢出
    print(f"普通查询获取 {len(all_data)} 条记录")

    # 流式查询:逐行处理,不占用内存
    result2 = await engine.execute("SELECT * FROM large_table", stream=True)
    count = 0
    async for row in result2:
        count += 1
    print(f"流式查询处理 {count} 条记录")

    await engine.disconnect()

asyncio.run(main())

批量操作

批量插入

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 准备批量数据
    users = [
        ("张三", "zhangsan@example.com"),
        ("李四", "lisi@example.com"),
        ("王五", "wangwu@example.com"),
        ("赵六", "zhaoliu@example.com")
    ]

    # 批量插入
    result = await engine.execute_many(
        "INSERT INTO users (name, email) VALUES (%s, %s)",
        users
    )

    if result.error:
        print(f"批量插入失败: {result.error_msg}")
    else:
        print(f"批量插入成功,插入了 {result.row_count} 条记录")

    await engine.disconnect()

asyncio.run(main())

使用字典批量插入

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 使用字典格式的批量数据
    users = [
        {"name": "张三", "email": "zhangsan@example.com"},
        {"name": "李四", "email": "lisi@example.com"},
        {"name": "王五", "email": "wangwu@example.com"}
    ]

    result = await engine.execute_many(
        "INSERT INTO users (name, email) VALUES (%(name)s, %(email)s)",
        users
    )

    if not result.error:
        print(f"批量插入成功,插入了 {result.row_count} 条记录")

    await engine.disconnect()

asyncio.run(main())

自定义模型

使用 Pydantic 模型

import asyncio
from pydantic import BaseModel
from asmysql import Engine

# 定义用户模型
class User(BaseModel):
    id: int
    name: str
    email: str
    age: int = 0

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    # 查询并使用自定义模型
    result = await engine.execute(
        "SELECT id, name, email, age FROM users WHERE id = %s",
        (1,),
        result_class=User
    )

    user = await result.fetch_one()
    if user:
        print(f"用户: {user.name}, 邮箱: {user.email}, 年龄: {user.age}")
        # 可以使用模型的所有功能
        print(user.model_dump())

    # 批量查询
    result = await engine.execute(
        "SELECT id, name, email, age FROM users",
        result_class=User
    )

    users = []
    async for user in result.iterate():
        users.append(user)
        print(f"{user.name}: {user.email}")

    await engine.disconnect()

asyncio.run(main())

使用普通类

import asyncio
from asmysql import Engine

# 定义普通类
class User:
    def __init__(self, id: int, name: str, email: str):
        self.id = id
        self.name = name
        self.email = email

    def __repr__(self):
        return f"User(id={self.id}, name={self.name}, email={self.email})"

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    result = await engine.execute(
        "SELECT id, name, email FROM users",
        result_class=User
    )

    async for user in result:
        print(user)  # 自动转换为 User 实例

    await engine.disconnect()

asyncio.run(main())

错误处理

基本错误处理

import asyncio
from asmysql import Engine

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    result = await engine.execute("SELECT * FROM non_existent_table")

    if result.error:
        print(f"错误码: {result.error_no}")
        print(f"错误信息: {result.error_msg}")
        print(f"错误对象: {result.error}")
    else:
        data = await result.fetch_all()
        print(f"查询成功,获取 {len(data)} 条记录")

    await engine.disconnect()

asyncio.run(main())

错误处理装饰器

import asyncio
from functools import wraps
from asmysql import Engine

def handle_db_errors(func):
    """数据库错误处理装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            print(f"数据库操作失败: {e}")
            return None
    return wrapper

@handle_db_errors
async def get_user(engine: Engine, user_id: int):
    result = await engine.execute(
        "SELECT * FROM users WHERE id = %s",
        (user_id,),
        result_class=dict
    )
    if result.error:
        raise Exception(f"查询失败: {result.error_msg}")
    return await result.fetch_one()

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    user = await get_user(engine, 1)
    if user:
        print(f"用户: {user['name']}")

    await engine.disconnect()

asyncio.run(main())

业务逻辑封装

用户服务类

import asyncio
from typing import Optional, List
from pydantic import BaseModel
from asmysql import Engine, AsMysql

class User(BaseModel):
    id: int
    name: str
    email: str
    age: int = 0

class UserService(AsMysql):
    """用户服务类"""

    async def get_user_by_id(self, user_id: int) -> Optional[User]:
        """根据 ID 获取用户"""
        result = await self.client.execute(
            "SELECT id, name, email, age FROM users WHERE id = %s",
            (user_id,),
            result_class=User
        )
        if result.error:
            return None
        return await result.fetch_one()

    async def get_user_by_email(self, email: str) -> Optional[User]:
        """根据邮箱获取用户"""
        result = await self.client.execute(
            "SELECT id, name, email, age FROM users WHERE email = %s",
            (email,),
            result_class=User
        )
        if result.error:
            return None
        return await result.fetch_one()

    async def list_users(self, limit: int = 100) -> List[User]:
        """获取用户列表"""
        result = await self.client.execute(
            "SELECT id, name, email, age FROM users LIMIT %s",
            (limit,),
            result_class=User
        )
        if result.error:
            return []
        return await result.fetch_all()

    async def create_user(self, name: str, email: str, age: int = 0) -> Optional[int]:
        """创建用户"""
        result = await self.client.execute(
            "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
            (name, email, age)
        )
        if result.error:
            return None
        return result.last_rowid

    async def update_user(self, user_id: int, name: str = None, email: str = None, age: int = None) -> bool:
        """更新用户"""
        updates = []
        values = []

        if name is not None:
            updates.append("name = %s")
            values.append(name)
        if email is not None:
            updates.append("email = %s")
            values.append(email)
        if age is not None:
            updates.append("age = %s")
            values.append(age)

        if not updates:
            return False

        values.append(user_id)
        query = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"

        result = await self.client.execute(query, tuple(values))
        return result.error is None

    async def delete_user(self, user_id: int) -> bool:
        """删除用户"""
        result = await self.client.execute(
            "DELETE FROM users WHERE id = %s",
            (user_id,)
        )
        return result.error is None

async def main():
    engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")
    await engine.connect()

    user_service = UserService(engine)

    # 创建用户
    user_id = await user_service.create_user("张三", "zhangsan@example.com", 25)
    print(f"创建用户,ID: {user_id}")

    # 获取用户
    user = await user_service.get_user_by_id(user_id)
    if user:
        print(f"用户: {user.name}, 邮箱: {user.email}, 年龄: {user.age}")

    # 更新用户
    success = await user_service.update_user(user_id, age=26)
    print(f"更新用户: {success}")

    # 获取用户列表
    users = await user_service.list_users(10)
    print(f"用户列表,共 {len(users)} 条")

    await engine.disconnect()

asyncio.run(main())

完整应用示例

Web 应用示例(FastAPI)

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from asmysql import Engine, AsMysql
from contextlib import asynccontextmanager

# 全局引擎
engine = Engine(url="mysql://root:pass@127.0.0.1:3306/test_db")

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时连接
    await engine.connect()
    yield
    # 关闭时断开
    await engine.disconnect()

app = FastAPI(lifespan=lifespan)

# 数据模型
class UserCreate(BaseModel):
    name: str
    email: str
    age: int = 0

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    age: int

# 服务类
class UserService(AsMysql):
    async def create_user(self, user_data: UserCreate) -> int:
        result = await self.client.execute(
            "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
            (user_data.name, user_data.email, user_data.age)
        )
        if result.error:
            raise HTTPException(status_code=500, detail=result.error_msg)
        return result.last_rowid

    async def get_user(self, user_id: int) -> UserResponse:
        result = await self.client.execute(
            "SELECT id, name, email, age FROM users WHERE id = %s",
            (user_id,),
            result_class=UserResponse
        )
        if result.error:
            raise HTTPException(status_code=500, detail=result.error_msg)
        user = await result.fetch_one()
        if not user:
            raise HTTPException(status_code=404, detail="用户不存在")
        return user

# API 路由
@app.post("/users", response_model=UserResponse)
async def create_user(user_data: UserCreate):
    service = UserService(engine)
    user_id = await service.create_user(user_data)
    return await service.get_user(user_id)

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    service = UserService(engine)
    return await service.get_user(user_id)

命令行工具示例

import asyncio
import argparse
from asmysql import Engine, AsMysql

class UserCLI(AsMysql):
    async def list_users(self):
        """列出所有用户"""
        result = await self.client.execute(
            "SELECT id, name, email FROM users",
            result_class=dict
        )
        if result.error:
            print(f"错误: {result.error_msg}")
            return

        users = await result.fetch_all()
        print(f"共 {len(users)} 个用户:")
        for user in users:
            print(f"  ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}")

    async def add_user(self, name: str, email: str):
        """添加用户"""
        result = await self.client.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            (name, email)
        )
        if result.error:
            print(f"添加失败: {result.error_msg}")
        else:
            print(f"添加成功,ID: {result.last_rowid}")

async def main():
    parser = argparse.ArgumentParser(description="用户管理工具")
    parser.add_argument("--host", default="127.0.0.1", help="MySQL 主机")
    parser.add_argument("--port", type=int, default=3306, help="MySQL 端口")
    parser.add_argument("--user", default="root", help="MySQL 用户")
    parser.add_argument("--password", default="", help="MySQL 密码")
    parser.add_argument("--database", required=True, help="数据库名")

    subparsers = parser.add_subparsers(dest="command", help="命令")
    subparsers.add_parser("list", help="列出用户")
    add_parser = subparsers.add_parser("add", help="添加用户")
    add_parser.add_argument("name", help="用户名")
    add_parser.add_argument("email", help="邮箱")

    args = parser.parse_args()

    # 创建引擎
    engine = Engine(
        host=args.host,
        port=args.port,
        user=args.user,
        password=args.password
    )

    await engine.connect()

    # 选择数据库
    await engine.execute(f"USE {args.database}")

    # 创建 CLI 实例
    cli = UserCLI(engine)

    # 执行命令
    if args.command == "list":
        await cli.list_users()
    elif args.command == "add":
        await cli.add_user(args.name, args.email)

    await engine.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

这些示例涵盖了 asmysql v2 版本的主要使用场景,可以根据实际需求进行修改和扩展。