Bài viết gần đây

| 🛠️ Thực hành và tối ưu

Được viết bởi thanhdt vào ngày 13/11/2025 lúc 06:11 | 6 lượt xem

Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python

Bắt lỗi và log chi tiết SQL Server

Giới thiệu

Khi làm việc với hệ thống phân tích dữ liệu kết hợp Python và SQL Server, việc xử lý lỗi và ghi log chi tiết là một phần không thể thiếu để đảm bảo tính ổn định và khả năng bảo trì của ứng dụng. Một hệ thống ghi log được thiết kế tốt giúp phát hiện, phân tích và khắc phục lỗi nhanh chóng, đồng thời cung cấp thông tin quý giá về hiệu suất và hành vi của hệ thống. Bài viết này sẽ hướng dẫn chi tiết các kỹ thuật bắt lỗi và thiết lập hệ thống log hiệu quả khi thao tác với SQL Server từ Python.

1. Tổng quan về xử lý lỗi và logging trong Python

1.1. Các loại lỗi thường gặp khi làm việc với SQL Server

Khi thao tác với SQL Server từ Python, chúng ta thường gặp các loại lỗi sau:

  1. Lỗi kết nối: Không thể kết nối đến máy chủ SQL Server
  2. Lỗi xác thực: Sai thông tin đăng nhập
  3. Lỗi cú pháp SQL: Lỗi trong câu lệnh SQL
  4. Lỗi thời gian chờ: Truy vấn mất quá nhiều thời gian để thực thi
  5. Lỗi ràng buộc dữ liệu: Vi phạm các ràng buộc như khóa ngoại, giá trị duy nhất, v.v
  6. Lỗi chuyển đổi kiểu dữ liệu: Không thể chuyển đổi dữ liệu giữa Python và SQL Server
  7. Lỗi tài nguyên: Hết bộ nhớ, kết nối, v.v

1.2. Hệ thống log trong Python

Python cung cấp module logging tiêu chuẩn giúp ghi log với nhiều cấp độ khác nhau:

  • DEBUG: Thông tin chi tiết, thường dùng khi gỡ lỗi
  • INFO: Xác nhận mọi thứ đang hoạt động như mong đợi
  • WARNING: Chỉ ra rằng có điều gì đó không mong muốn xảy ra, nhưng ứng dụng vẫn hoạt động
  • ERROR: Do lỗi, ứng dụng không thể thực hiện một số chức năng
  • CRITICAL: Lỗi nghiêm trọng, ứng dụng có thể không tiếp tục hoạt động

2. Thiết lập cơ bản cho logging

2.1. Thiết lập logging cơ bản

import logging

# Cấu hình cơ bản
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='sql_operations.log',
    filemode='a'  # Append mode
)

# Tạo logger
logger = logging.getLogger('sql_server_operations')

2.2. Cấu hình handlers đa dạng

import logging
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

def setup_logger(name, log_file, level=logging.INFO):
    """Thiết lập logger với file và console handlers"""

    # Tạo thư mục logs nếu chưa tồn tại
    log_dir = os.path.dirname(log_file)
    if log_dir and not os.path.exists(log_dir):
        os.makedirs(log_dir)

    # Tạo và cấu hình logger
    logger = logging.getLogger(name)
    logger.setLevel(level)

    # Ngăn log trùng lặp
    if logger.handlers:
        return logger

    # Tạo file handler sử dụng RotatingFileHandler
    file_handler = RotatingFileHandler(
        log_file, maxBytes=10*1024*1024, backupCount=5
    )
    file_handler.setLevel(level)

    # Tạo console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(level)

    # Tạo formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # Thêm handlers vào logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

# Sử dụng
sql_logger = setup_logger(
    'sql_server_operations', 
    os.path.join('logs', 'sql_operations.log')
)

2.3. Sử dụng TimedRotatingFileHandler để phân chia log theo thời gian

def setup_timed_logger(name, log_file, level=logging.INFO):
    """Thiết lập logger với TimedRotatingFileHandler để phân chia log theo ngày"""

    logger = logging.getLogger(name)
    logger.setLevel(level)

    if logger.handlers:
        return logger

    # Tạo thư mục logs nếu chưa tồn tại
    log_dir = os.path.dirname(log_file)
    if log_dir and not os.path.exists(log_dir):
        os.makedirs(log_dir)

    # Tạo file handler sử dụng TimedRotatingFileHandler
    # Phân chia file log mỗi ngày, giữ lại 30 file
    file_handler = TimedRotatingFileHandler(
        log_file, when='midnight', interval=1, backupCount=30
    )
    file_handler.setLevel(level)

    # Tạo formatter bao gồm nhiều thông tin hơn
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
    )
    file_handler.setFormatter(formatter)

    logger.addHandler(file_handler)

    return logger

# Sử dụng
detailed_logger = setup_timed_logger(
    'sql_detailed_operations',
    os.path.join('logs', 'sql_operations_detailed.log')
)

3. Bắt và xử lý lỗi SQL Server

3.1. Xử lý lỗi cơ bản với try-except

import pyodbc
import logging

logger = logging.getLogger('sql_server_operations')

def execute_query(conn_string, query, params=None):
    """Thực thi truy vấn SQL với xử lý lỗi cơ bản"""

    conn = None
    cursor = None

    try:
        # Thiết lập kết nối
        conn = pyodbc.connect(conn_string)
        cursor = conn.cursor()

        # Thực thi truy vấn
        logger.info(f"Thực thi truy vấn: {query[:100]}...")

        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)

        # Commit nếu là truy vấn thay đổi dữ liệu
        if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')):
            conn.commit()
            logger.info("Đã commit thay đổi")
            return cursor.rowcount  # Trả về số hàng bị ảnh hưởng

        # Lấy kết quả nếu là truy vấn SELECT
        results = cursor.fetchall()
        logger.info(f"Truy vấn trả về {len(results)} kết quả")
        return results

    except pyodbc.Error as e:
        if conn:
            conn.rollback()
        logger.error(f"Lỗi SQL: {str(e)}")
        raise

    except Exception as e:
        if conn:
            conn.rollback()
        logger.error(f"Lỗi không xác định: {str(e)}")
        raise

    finally:
        # Đảm bảo đóng cursor và connection
        if cursor:
            cursor.close()
        if conn:
            conn.close()
        logger.debug("Đã đóng kết nối")

3.2. Xử lý lỗi chi tiết theo từng loại lỗi

import pyodbc
import time
import logging
from functools import wraps

logger = logging.getLogger('sql_detailed_operations')

# Định nghĩa các mã lỗi SQL Server phổ biến
SQL_TIMEOUT_ERROR = '08S01'  # Timeout
SQL_CONNECTION_ERROR = '08001'  # Không thể kết nối
SQL_CONSTRAINT_VIOLATION = '23000'  # Vi phạm ràng buộc

def retry_on_connection_error(max_attempts=3, delay=2):
    """Decorator để thử lại khi gặp lỗi kết nối"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            last_exception = None

            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except pyodbc.Error as e:
                    error_code = e.args[0] if len(e.args) > 0 else "Unknown"

                    # Chỉ thử lại với lỗi kết nối
                    if error_code in (SQL_TIMEOUT_ERROR, SQL_CONNECTION_ERROR):
                        attempts += 1
                        wait_time = delay * attempts  # Tăng thời gian chờ theo số lần thử

                        logger.warning(
                            f"Lỗi kết nối (mã: {error_code}). "
                            f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây. "
                            f"Chi tiết: {str(e)}"
                        )

                        time.sleep(wait_time)
                        last_exception = e
                    else:
                        # Với các lỗi khác thì ném ra ngay
                        raise

            # Nếu đã thử hết số lần mà vẫn lỗi
            logger.error(f"Đã thử {max_attempts} lần nhưng vẫn thất bại: {str(last_exception)}")
            raise last_exception

        return wrapper
    return decorator


class SQLServerError(Exception):
    """Lớp cơ sở cho các lỗi SQL Server tùy chỉnh"""
    def __init__(self, message, original_error=None, query=None, params=None):
        self.message = message
        self.original_error = original_error
        self.query = query
        self.params = params
        super().__init__(self.message)


class SQLConnectionError(SQLServerError):
    """Lỗi kết nối đến SQL Server"""
    pass


class SQLConstraintError(SQLServerError):
    """Lỗi vi phạm ràng buộc dữ liệu"""
    pass


class SQLTimeoutError(SQLServerError):
    """Lỗi timeout khi thực thi truy vấn"""
    pass


class SQLSyntaxError(SQLServerError):
    """Lỗi cú pháp SQL"""
    pass


@retry_on_connection_error(max_attempts=3, delay=2)
def execute_query_advanced(conn_string, query, params=None, timeout=30):
    """Thực thi truy vấn SQL với xử lý lỗi chi tiết"""

    conn = None
    cursor = None
    start_time = time.time()

    try:
        # Log thông tin truy vấn
        if params:
            masked_params = ['***' if i > 1 else str(p)[:10] for i, p in enumerate(params)]
            logger.info(f"Thực thi truy vấn với tham số: {query[:100]}... - Params: {masked_params}")
        else:
            logger.info(f"Thực thi truy vấn: {query[:100]}...")

        # Thiết lập kết nối
        conn = pyodbc.connect(conn_string, timeout=timeout)
        cursor = conn.cursor()

        # Thực thi truy vấn
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)

        # Đo thời gian thực thi
        execution_time = time.time() - start_time

        # Xác định loại truy vấn và xử lý kết quả phù hợp
        query_type = query.strip().upper().split()[0] if query.strip() else ""

        if query_type in ('INSERT', 'UPDATE', 'DELETE'):
            conn.commit()
            affected_rows = cursor.rowcount
            logger.info(f"Đã commit thay đổi. {affected_rows} hàng bị ảnh hưởng. "
                       f"Thời gian thực thi: {execution_time:.3f}s")
            return affected_rows

        elif query_type == 'SELECT':
            columns = [column[0] for column in cursor.description]
            results = cursor.fetchall()
            row_count = len(results)

            logger.info(f"Truy vấn SELECT thành công. Trả về {row_count} kết quả. "
                       f"Thời gian thực thi: {execution_time:.3f}s")

            # Log mẫu dữ liệu (chỉ log vài hàng đầu tiên để tránh quá tải)
            if row_count > 0 and logger.level <= logging.DEBUG:
                sample_data = str(results[0])
                if len(sample_data) > 200:
                    sample_data = sample_data[:200] + "..."
                logger.debug(f"Mẫu dữ liệu: {sample_data}")

            # Trả về kết quả dưới dạng list of dict để dễ sử dụng
            return [dict(zip(columns, row)) for row in results]

        else:
            conn.commit()
            logger.info(f"Đã thực thi truy vấn. Thời gian thực thi: {execution_time:.3f}s")
            return True

    except pyodbc.Error as e:
        # Rollback transaction nếu có lỗi
        if conn:
            try:
                conn.rollback()
                logger.info("Đã rollback transaction")
            except Exception:
                pass

        # Xác định mã lỗi
        error_code = e.args[0] if len(e.args) > 0 else "Unknown"
        error_message = str(e)

        # Ghi log và ném ra exception tùy chỉnh tương ứng
        if error_code == SQL_CONNECTION_ERROR:
            logger.error(f"Lỗi kết nối SQL Server: {error_message}")
            raise SQLConnectionError("Không thể kết nối đến SQL Server", e, query, params)

        elif error_code == SQL_TIMEOUT_ERROR:
            logger.error(f"Lỗi timeout SQL: {error_message}")
            raise SQLTimeoutError("Truy vấn bị timeout", e, query, params)

        elif error_code == SQL_CONSTRAINT_VIOLATION:
            logger.error(f"Lỗi vi phạm ràng buộc dữ liệu: {error_message}")
            raise SQLConstraintError("Vi phạm ràng buộc dữ liệu", e, query, params)

        elif 'syntax' in error_message.lower():
            logger.error(f"Lỗi cú pháp SQL: {error_message}")
            raise SQLSyntaxError("Lỗi cú pháp trong truy vấn SQL", e, query, params)

        else:
            logger.error(f"Lỗi SQL không xác định (mã: {error_code}): {error_message}")
            raise SQLServerError(f"Lỗi SQL Server: {error_message}", e, query, params)

    except Exception as e:
        # Xử lý các lỗi khác không phải từ SQL Server
        if conn:
            try:
                conn.rollback()
                logger.info("Đã rollback transaction")
            except Exception:
                pass

        logger.error(f"Lỗi không xác định: {str(e)}", exc_info=True)
        raise

    finally:
        # Đảm bảo đóng cursor và connection
        if cursor:
            cursor.close()
        if conn:
            conn.close()
        logger.debug("Đã đóng kết nối DB")

4. Thiết kế hệ thống log toàn diện

4.1. Tạo lớp Logger tùy chỉnh

import logging
import os
import json
import traceback
import socket
from datetime import datetime
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler


class SQLLogger:
    """Lớp logger tùy chỉnh cho các thao tác với SQL Server"""

    def __init__(self, app_name, log_dir='logs', log_level=logging.INFO):
        """Khởi tạo hệ thống log"""
        self.app_name = app_name
        self.log_dir = log_dir
        self.log_level = log_level
        self.hostname = socket.gethostname()

        # Tạo thư mục logs nếu chưa tồn tại
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)

        # Thiết lập các logger
        self.setup_loggers()

    def setup_loggers(self):
        """Thiết lập các logger khác nhau cho từng mục đích"""

        # Logger cho các thao tác SQL thông thường
        self.sql_logger = self._create_logger(
            'sql_operations',
            os.path.join(self.log_dir, 'sql_operations.log'),
            self.log_level
        )

        # Logger chi tiết cho lỗi
        self.error_logger = self._create_logger(
            'sql_errors',
            os.path.join(self.log_dir, 'sql_errors.log'),
            logging.ERROR
        )

        # Logger cho các truy vấn chậm
        self.slow_query_logger = self._create_logger(
            'slow_queries',
            os.path.join(self.log_dir, 'slow_queries.log'),
            logging.WARNING
        )

    def _create_logger(self, name, log_file, level):
        """Tạo logger với file handler và formatter"""

        # Tạo logger với tên ứng dụng làm prefix
        logger_name = f"{self.app_name}.{name}"
        logger = logging.getLogger(logger_name)
        logger.setLevel(level)

        # Ngăn log trùng lặp
        if logger.handlers:
            return logger

        # Tạo file handler với rotation
        file_handler = TimedRotatingFileHandler(
            log_file, when='midnight', interval=1, backupCount=30
        )
        file_handler.setLevel(level)

        # Tạo formatter chi tiết
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(name)s - [%(hostname)s] - '
            '%(pathname)s:%(lineno)d - %(message)s'
        )

        # Thêm thông tin hostname vào formatter
        old_factory = logging.getLogRecordFactory()

        def record_factory(*args, **kwargs):
            record = old_factory(*args, **kwargs)
            record.hostname = self.hostname
            return record

        logging.setLogRecordFactory(record_factory)

        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

        return logger

    def log_query(self, query, params=None, duration=None, result_count=None):
        """Ghi log truy vấn SQL"""

        # Tạo thông tin log
        log_data = {
            'query': query[:500] + ('...' if len(query) > 500 else ''),
            'timestamp': datetime.now().isoformat(),
            'hostname': self.hostname
        }

        # Thêm params nếu có (che dấu thông tin nhạy cảm)
        if params:
            masked_params = []
            for p in params:
                if isinstance(p, str) and len(p) > 10:
                    masked_params.append(p[:5] + '...' + p[-2:])
                else:
                    masked_params.append(p)
            log_data['params'] = masked_params

        # Thêm thời gian thực thi nếu có
        if duration:
            log_data['duration'] = f"{duration:.3f}s"

            # Log truy vấn chậm (>1s) vào logger riêng
            if duration > 1.0:
                self.slow_query_logger.warning(
                    f"Truy vấn chậm: {log_data['query']} - "
                    f"Thời gian: {log_data['duration']}"
                )

        # Thêm số lượng kết quả nếu có
        if result_count is not None:
            log_data['result_count'] = result_count

        # Ghi log thông thường
        self.sql_logger.info(f"SQL Query: {json.dumps(log_data)}")

        return log_data

    def log_error(self, error, query=None, params=None, context=None):
        """Ghi log lỗi SQL với thông tin chi tiết"""

        error_type = type(error).__name__
        error_message = str(error)
        stack_trace = traceback.format_exc()

        # Tạo thông tin log
        error_data = {
            'error_type': error_type,
            'error_message': error_message,
            'timestamp': datetime.now().isoformat(),
            'hostname': self.hostname,
            'stack_trace': stack_trace
        }

        # Thêm query nếu có
        if query:
            error_data['query'] = query[:500] + ('...' if len(query) > 500 else '')

        # Thêm params nếu có (che dấu thông tin nhạy cảm)
        if params:
            masked_params = []
            for p in params:
                if isinstance(p, str) and len(p) > 10:
                    masked_params.append(p[:5] + '...' + p[-2:])
                else:
                    masked_params.append(p)
            error_data['params'] = masked_params

        # Thêm context nếu có
        if context:
            error_data['context'] = context

        # Ghi log lỗi
        self.error_logger.error(f"SQL Error: {json.dumps(error_data)}")

        # Đồng thời ghi log thông thường
        self.sql_logger.error(
            f"SQL Error: {error_type} - {error_message}"
        )

        return error_data

    def log_transaction(self, action, affected_rows=None, duration=None):
        """Ghi log các thao tác transaction"""

        log_data = {
            'action': action,
            'timestamp': datetime.now().isoformat(),
            'hostname': self.hostname
        }

        if affected_rows is not None:
            log_data['affected_rows'] = affected_rows

        if duration:
            log_data['duration'] = f"{duration:.3f}s"

        self.sql_logger.info(f"Transaction: {json.dumps(log_data)}")

        return log_data

4.2. Tích hợp logger tùy chỉnh với thao tác SQL

import pyodbc
import time
from contextlib import contextmanager

class SQLServerDatabase:
    """Lớp quản lý kết nối và thao tác với SQL Server kèm logging"""

    def __init__(self, conn_string, app_name="SQLApp", log_dir="logs"):
        """Khởi tạo với chuỗi kết nối và thiết lập logger"""
        self.conn_string = conn_string
        self.logger = SQLLogger(app_name, log_dir)

    @contextmanager
    def connection(self):
        """Context manager để quản lý kết nối tự động đóng"""
        conn = None
        try:
            conn = pyodbc.connect(self.conn_string)
            yield conn
        except pyodbc.Error as e:
            self.logger.log_error(e, context="establishing connection")
            raise
        finally:
            if conn:
                conn.close()

    @contextmanager
    def transaction(self):
        """Context manager để quản lý transaction"""
        with self.connection() as conn:
            try:
                # Bắt đầu transaction
                start_time = time.time()
                self.logger.log_transaction("START")

                yield conn

                # Commit transaction nếu không có lỗi
                conn.commit()
                duration = time.time() - start_time
                self.logger.log_transaction("COMMIT", duration=duration)

            except Exception as e:
                # Rollback transaction nếu có lỗi
                conn.rollback()
                duration = time.time() - start_time
                self.logger.log_transaction("ROLLBACK", duration=duration)

                # Log lỗi
                self.logger.log_error(e, context="transaction")
                raise

    def execute_query(self, query, params=None):
        """Thực thi truy vấn và trả về kết quả"""
        with self.connection() as conn:
            try:
                start_time = time.time()
                cursor = conn.cursor()

                # Thực thi truy vấn
                if params:
                    cursor.execute(query, params)
                else:
                    cursor.execute(query)

                # Lấy kết quả nếu là truy vấn SELECT
                if query.strip().upper().startswith("SELECT"):
                    columns = [column[0] for column in cursor.description]
                    results = cursor.fetchall()
                    result_count = len(results)

                    # Tính thời gian thực thi
                    duration = time.time() - start_time

                    # Log truy vấn
                    self.logger.log_query(
                        query, params, duration=duration, result_count=result_count
                    )

                    # Trả về kết quả dưới dạng list of dict
                    return [dict(zip(columns, row)) for row in results]
                else:
                    # Đối với các truy vấn thay đổi dữ liệu
                    affected_rows = cursor.rowcount

                    # Tính thời gian thực thi
                    duration = time.time() - start_time

                    # Log truy vấn
                    self.logger.log_query(
                        query, params, duration=duration, result_count=affected_rows
                    )

                    # Commit thay đổi
                    conn.commit()

                    # Trả về số hàng bị ảnh hưởng
                    return affected_rows

            except Exception as e:
                # Log lỗi
                self.logger.log_error(e, query, params)
                raise

    def execute_many(self, query, params_list):
        """Thực thi nhiều truy vấn với danh sách tham số"""
        with self.transaction() as conn:
            try:
                start_time = time.time()
                cursor = conn.cursor()

                # Thực thi executemany
                cursor.executemany(query, params_list)

                # Tính thời gian thực thi
                duration = time.time() - start_time

                # Lấy số hàng bị ảnh hưởng
                affected_rows = cursor.rowcount

                # Log thông tin
                self.logger.log_query(
                    query, 
                    f"[{len(params_list)} parameter sets]", 
                    duration=duration, 
                    result_count=affected_rows
                )

                return affected_rows

            except Exception as e:
                # Log lỗi
                self.logger.log_error(e, query, f"[{len(params_list)} parameter sets]")
                raise

    def bulk_insert(self, table_name, data, batch_size=1000):
        """Thực hiện bulk insert với logging chi tiết"""
        if not data:
            return 0

        total_rows = len(data)
        total_batches = (total_rows + batch_size - 1) // batch_size
        total_inserted = 0
        start_total_time = time.time()

        self.logger.sql_logger.info(
            f"Bắt đầu bulk insert vào bảng {table_name}. "
            f"{total_rows} hàng, {total_batches} batch(es)"
        )

        try:
            # Lấy tên các cột từ dữ liệu
            if isinstance(data[0], dict):
                columns = list(data[0].keys())
                # Chuyển dữ liệu từ dict sang list
                data_values = [[row[col] for col in columns] for row in data]
            else:
                raise ValueError("Data phải là list of dict")

            # Xây dựng câu truy vấn insert
            placeholders = ','.join('?' for _ in columns)
            column_names = ','.join(columns)
            query = f"INSERT INTO {table_name} ({column_names}) VALUES ({placeholders})"

            # Thực hiện insert theo batch
            for i in range(0, total_rows, batch_size):
                batch_start_time = time.time()

                # Lấy một batch dữ liệu
                batch = data_values[i:i+batch_size]
                batch_size_actual = len(batch)

                # Thực hiện insert
                with self.transaction() as conn:
                    cursor = conn.cursor()
                    cursor.executemany(query, batch)
                    batch_affected = cursor.rowcount

                # Tính thời gian và log
                batch_duration = time.time() - batch_start_time
                total_inserted += batch_affected

                self.logger.sql_logger.info(
                    f"Batch {(i//batch_size)+1}/{total_batches}: "
                    f"Đã insert {batch_affected}/{batch_size_actual} hàng "
                    f"trong {batch_duration:.3f}s"
                )

            # Log tổng kết
            total_duration = time.time() - start_total_time
            self.logger.sql_logger.info(
                f"Hoàn thành bulk insert vào bảng {table_name}. "
                f"Tổng số: {total_inserted}/{total_rows} hàng "
                f"trong {total_duration:.3f}s"
            )

            return total_inserted

        except Exception as e:
            # Log lỗi
            self.logger.log_error(
                e, 
                context=f"bulk_insert to {table_name}, {total_rows} rows"
            )
            raise

5. Ứng dụng thực tế

5.1. Ví dụ sử dụng lớp Database với xử lý lỗi

# Ví dụ sử dụng lớp Database để thao tác với SQL Server
conn_string = 'DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password'

# Khởi tạo đối tượng Database
db = SQLServerDatabase(conn_string, app_name="SalesAnalytics", log_dir="logs/sales")

try:
    # Truy vấn đơn giản
    results = db.execute_query(
        "SELECT TOP 10 * FROM DuLieuBanHang WHERE NgayBan >= ?", 
        ['2024-01-01']
    )
    print(f"Lấy được {len(results)} kết quả")

    # Thực hiện insert với transaction
    with db.transaction() as conn:
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO DuLieuBanHang (NgayBan, MaSanPham, SoLuong, DoanhThu)
            VALUES (?, ?, ?, ?)
        """, ['2024-05-01', 'SP001', 5, 1500000])

        # Có thể thực hiện nhiều lệnh trong cùng một transaction
        cursor.execute("""
            UPDATE TongKetDoanhThu
            SET TongDoanhThu = TongDoanhThu + ?
            WHERE Thang = 5 AND Nam = 2024
        """, [1500000])

    # Bulk insert dữ liệu
    sales_data = [
        {
            'NgayBan': '2024-05-01',
            'MaSanPham': f'SP{i:03d}',
            'SoLuong': i % 10 + 1,
            'DoanhThu': (i % 10 + 1) * 300000
        }
        for i in range(1, 101)
    ]

    inserted = db.bulk_insert('DuLieuBanHang', sales_data, batch_size=20)
    print(f"Đã insert {inserted} hàng dữ liệu")

except SQLConnectionError as e:
    print(f"Lỗi kết nối: {e.message}")
    # Thử kết nối lại hoặc thông báo cho người dùng

except SQLConstraintError as e:
    print(f"Lỗi ràng buộc dữ liệu: {e.message}")
    # Kiểm tra và sửa dữ liệu

except SQLTimeoutError as e:
    print(f"Truy vấn bị timeout: {e.message}")
    # Tối ưu truy vấn hoặc tăng timeout

except SQLSyntaxError as e:
    print(f"Lỗi cú pháp SQL: {e.message}")
    # Sửa lỗi cú pháp trong truy vấn

except SQLServerError as e:
    print(f"Lỗi SQL Server: {e.message}")
    # Xử lý lỗi chung từ SQL Server

except Exception as e:
    print(f"Lỗi không xác định: {str(e)}")
    # Ghi log và thông báo lỗi chung

5.2. Xử lý lỗi khi làm việc với pandas và SQLAlchemy

import pandas as pd
from sqlalchemy import create_engine, text
import urllib
import logging

# Thiết lập logger
logger = logging.getLogger('pandas_sql')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('logs/pandas_sql.log')
handler.setFormatter(logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)

def create_sql_engine(server, database, username, password):
    """Tạo SQLAlchemy engine với xử lý lỗi"""
    try:
        # Tạo chuỗi kết nối
        params = urllib.parse.quote_plus(
            f"DRIVER={{SQL Server}};SERVER={server};"
            f"DATABASE={database};UID={username};PWD={password}"
        )

        # Tạo engine với cấu hình
        engine = create_engine(
            f"mssql+pyodbc:///?odbc_connect={params}", 
            pool_pre_ping=True,  # Kiểm tra kết nối trước khi sử dụng
            pool_recycle=3600,   # Làm mới kết nối sau 1 giờ
            connect_args={'timeout': 30}  # Timeout kết nối là 30 giây
        )

        # Kiểm tra kết nối
        with engine.connect() as conn:
            result = conn.execute(text("SELECT 1"))
            if result.scalar() == 1:
                logger.info(f"Kết nối thành công đến {server}/{database}")
                return engine
            else:
                raise Exception("Kiểm tra kết nối không thành công")

    except Exception as e:
        logger.error(f"Lỗi tạo kết nối SQL: {str(e)}")
        raise

def read_sql_with_logging(query, engine, params=None):
    """Đọc dữ liệu từ SQL với pandas và log"""
    try:
        start_time = time.time()
        logger.info(f"Bắt đầu đọc dữ liệu với query: {query[:200]}...")

        # Thực hiện truy vấn
        if params:
            df = pd.read_sql(query, engine, params=params)
        else:
            df = pd.read_sql(query, engine)

        # Log kết quả
        duration = time.time() - start_time
        row_count = len(df)
        col_count = len(df.columns)

        logger.info(
            f"Hoàn thành đọc dữ liệu: {row_count} hàng × {col_count} cột "
            f"trong {duration:.3f}s"
        )

        # Log thông tin về bộ nhớ sử dụng
        memory_usage = df.memory_usage(deep=True).sum()
        logger.info(f"Bộ nhớ sử dụng: {memory_usage/1024/1024:.2f} MB")

        return df

    except Exception as e:
        logger.error(f"Lỗi đọc dữ liệu SQL: {str(e)}")
        raise

def write_to_sql_with_logging(df, table_name, engine, if_exists='replace', chunksize=1000):
    """Ghi DataFrame vào SQL Server với logging"""
    try:
        start_time = time.time()
        row_count = len(df)
        logger.info(
            f"Bắt đầu ghi {row_count} hàng vào bảng {table_name}, "
            f"chế độ: {if_exists}, kích thước chunk: {chunksize}"
        )

        # Thực hiện ghi dữ liệu
        df.to_sql(
            table_name, 
            engine, 
            if_exists=if_exists, 
            chunksize=chunksize,
            index=False
        )

        # Log kết quả
        duration = time.time() - start_time
        logger.info(
            f"Hoàn thành ghi dữ liệu vào bảng {table_name}: "
            f"{row_count} hàng trong {duration:.3f}s"
        )

        return True

    except Exception as e:
        logger.error(f"Lỗi ghi dữ liệu vào SQL: {str(e)}")
        raise

5.3. Tích hợp với hệ thống giám sát

import requests
import socket
import json
import traceback
from datetime import datetime

class SQLMonitor:
    """Lớp giám sát SQL Server và gửi thông báo khi có lỗi"""

    def __init__(self, webhook_url=None, email_config=None):
        """Khởi tạo với URL webhook và cấu hình email"""
        self.webhook_url = webhook_url
        self.email_config = email_config
        self.hostname = socket.gethostname()
        self.logger = logging.getLogger('sql_monitor')

        # Thiết lập handler cho logger
        handler = logging.FileHandler('logs/sql_monitor.log')
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_and_alert(self, error, query=None, context=None, alert_level='warning'):
        """Ghi log lỗi và gửi thông báo"""
        # Tạo thông tin lỗi
        error_data = {
            'timestamp': datetime.now().isoformat(),
            'hostname': self.hostname,
            'error_type': type(error).__name__,
            'error_message': str(error),
            'stack_trace': traceback.format_exc(),
            'alert_level': alert_level
        }

        if query:
            error_data['query'] = query

        if context:
            error_data['context'] = context

        # Ghi log
        if alert_level == 'critical':
            self.logger.critical(f"SQL Critical Error: {json.dumps(error_data)}")
        elif alert_level == 'error':
            self.logger.error(f"SQL Error: {json.dumps(error_data)}")
        else:
            self.logger.warning(f"SQL Warning: {json.dumps(error_data)}")

        # Gửi thông báo
        if alert_level in ('error', 'critical'):
            self._send_alert(error_data)

    def _send_alert(self, error_data):
        """Gửi thông báo lỗi qua webhook và/hoặc email"""

        # Gửi qua webhook (ví dụ: Slack, Teams, etc.)
        if self.webhook_url:
            try:
                # Định dạng thông báo
                message = {
                    'text': f"SQL Error on {error_data['hostname']}",
                    'attachments': [{
                        'title': f"{error_data['error_type']}: {error_data['error_message']}",
                        'text': f"Context: {error_data.get('context', 'N/A')}n"
                               f"Query: {error_data.get('query', 'N/A')}n"
                               f"Time: {error_data['timestamp']}",
                        'color': 'danger' if error_data['alert_level'] == 'critical' else 'warning'
                    }]
                }

                # Gửi request
                response = requests.post(
                    self.webhook_url, 
                    json=message,
                    timeout=5
                )

                if response.status_code == 200:
                    self.logger.info("Đã gửi thông báo lỗi qua webhook")
                else:
                    self.logger.warning(
                        f"Không thể gửi thông báo qua webhook. "
                        f"Status code: {response.status_code}"
                    )

            except Exception as e:
                self.logger.error(f"Lỗi khi gửi thông báo webhook: {str(e)}")

        # Gửi qua email
        if self.email_config:
            try:
                import smtplib
                from email.mime.text import MIMEText
                from email.mime.multipart import MIMEMultipart

                # Tạo email
                msg = MIMEMultipart()
                msg['From'] = self.email_config['from']
                msg['To'] = self.email_config['to']
                msg['Subject'] = f"SQL Error on {error_data['hostname']}: {error_data['error_type']}"

                # Tạo nội dung
                body = f"""
                <h2>SQL Error Details</h2>
                <p><strong>Time:</strong> {error_data['timestamp']}</p>
                <p><strong>Host:</strong> {error_data['hostname']}</p>
                <p><strong>Error Type:</strong> {error_data['error_type']}</p>
                <p><strong>Error Message:</strong> {error_data['error_message']}</p>

                <h3>Context</h3>
                <p>{error_data.get('context', 'N/A')}</p>

                <h3>Query</h3>
                <pre>{error_data.get('query', 'N/A')}</pre>

                <h3>Stack Trace</h3>
                <pre>{error_data['stack_trace']}</pre>
                """

                msg.attach(MIMEText(body, 'html'))

                # Gửi email
                server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
                server.starttls()
                server.login(self.email_config['username'], self.email_config['password'])
                server.send_message(msg)
                server.quit()

                self.logger.info("Đã gửi thông báo lỗi qua email")

            except Exception as e:
                self.logger.error(f"Lỗi khi gửi email thông báo: {str(e)}")

5.4. Hệ thống theo dõi hiệu suất truy vấn SQL

class SQLPerformanceTracker:
    """Lớp theo dõi hiệu suất truy vấn SQL"""

    def __init__(self, log_dir='logs/performance'):
        """Khởi tạo với thư mục log"""
        self.log_dir = log_dir

        # Tạo thư mục nếu chưa tồn tại
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)

        # Thiết lập logger
        self.logger = logging.getLogger('sql_performance')
        handler = logging.FileHandler(os.path.join(log_dir, 'performance.log'))
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

        # Thống kê hiệu suất
        self.stats = {
            'total_queries': 0,
            'total_duration': 0,
            'max_duration': 0,
            'slow_queries': 0,
            'error_queries': 0,
            'query_types': {
                'SELECT': 0,
                'INSERT': 0,
                'UPDATE': 0,
                'DELETE': 0,
                'OTHER': 0
            }
        }

    def track_query(self, query, duration, result_count=None, error=None):
        """Theo dõi một truy vấn SQL"""
        try:
            # Cập nhật thống kê
            self.stats['total_queries'] += 1
            self.stats['total_duration'] += duration
            self.stats['max_duration'] = max(self.stats['max_duration'], duration)

            if duration > 1.0:  # Truy vấn chậm > 1 giây
                self.stats['slow_queries'] += 1

            if error:
                self.stats['error_queries'] += 1

            # Xác định loại truy vấn
            first_word = query.strip().upper().split()[0] if query.strip() else "OTHER"
            if first_word in self.stats['query_types']:
                self.stats['query_types'][first_word] += 1
            else:
                self.stats['query_types']['OTHER'] += 1

            # Log thông tin truy vấn
            log_entry = {
                'timestamp': datetime.now().isoformat(),
                'query_type': first_word,
                'duration': duration,
                'result_count': result_count,
                'has_error': error is not None,
                'query_preview': query[:100] + ('...' if len(query) > 100 else '')
            }

            self.logger.info(f"Query Stats: {json.dumps(log_entry)}")

            # Log chi tiết cho truy vấn chậm
            if duration > 1.0:
                slow_log_entry = {
                    'timestamp': datetime.now().isoformat(),
                    'duration': duration,
                    'query': query,
                    'result_count': result_count
                }

                with open(os.path.join(self.log_dir, 'slow_queries.log'), 'a') as f:
                    f.write(f"{json.dumps(slow_log_entry)}n")

            return log_entry

        except Exception as e:
            print(f"Lỗi khi theo dõi truy vấn: {str(e)}")

    def get_statistics(self):
        """Lấy thống kê hiệu suất"""
        stats = self.stats.copy()

        # Tính thời gian trung bình
        if stats['total_queries'] > 0:
            stats['avg_duration'] = stats['total_duration'] / stats['total_queries']
        else:
            stats['avg_duration'] = 0

        return stats

    def generate_report(self, output_file=None):
        """Tạo báo cáo hiệu suất"""
        stats = self.get_statistics()

        report = f"""
        SQL Performance Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        =============================================================

        Summary:
        --------
        Total Queries: {stats['total_queries']}
        Average Duration: {stats['avg_duration']:.6f} seconds
        Maximum Duration: {stats['max_duration']:.6f} seconds
        Slow Queries (>1s): {stats['slow_queries']} ({stats['slow_queries']/max(1, stats['total_queries'])*100:.2f}%)
        Error Queries: {stats['error_queries']} ({stats['error_queries']/max(1, stats['total_queries'])*100:.2f}%)

        Query Types:
        ------------
        SELECT: {stats['query_types']['SELECT']} ({stats['query_types']['SELECT']/max(1, stats['total_queries'])*100:.2f}%)
        INSERT: {stats['query_types']['INSERT']} ({stats['query_types']['INSERT']/max(1, stats['total_queries'])*100:.2f}%)
        UPDATE: {stats['query_types']['UPDATE']} ({stats['query_types']['UPDATE']/max(1, stats['total_queries'])*100:.2f}%)
        DELETE: {stats['query_types']['DELETE']} ({stats['query_types']['DELETE']/max(1, stats['total_queries'])*100:.2f}%)
        OTHER: {stats['query_types']['OTHER']} ({stats['query_types']['OTHER']/max(1, stats['total_queries'])*100:.2f}%)
        """

        if output_file:
            with open(output_file, 'w') as f:
                f.write(report)

        return report

6. Tích hợp vào hệ thống trực tiếp

6.1. Tạo decorators cho xử lý lỗi tự động

from functools import wraps
import time
import traceback

def log_sql_operation(logger):
    """Decorator để log các thao tác SQL"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Lấy tên hàm
            func_name = func.__name__
            start_time = time.time()

            try:
                # Thực thi hàm
                logger.info(f"Bắt đầu thực thi {func_name}")
                result = func(*args, **kwargs)

                # Tính thời gian thực thi
                duration = time.time() - start_time

                # Log kết quả thành công
                logger.info(f"Thực thi {func_name} thành công trong {duration:.3f}s")

                return result

            except Exception as e:
                # Tính thời gian thực thi
                duration = time.time() - start_time

                # Log lỗi
                error_type = type(e).__name__
                error_message = str(e)
                stack_trace = traceback.format_exc()

                logger.error(
                    f"Lỗi khi thực thi {func_name}: {error_type} - {error_message}n"
                    f"Thời gian: {duration:.3f}sn"
                    f"Stack trace: {stack_trace}"
                )

                # Ném lại exception
                raise

        return wrapper
    return decorator

def retry_on_specific_errors(max_attempts=3, delay=2, error_types=(Exception,)):
    """Decorator để thử lại khi gặp lỗi cụ thể"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            last_exception = None

            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except error_types as e:
                    attempts += 1
                    last_exception = e

                    # Tăng thời gian chờ theo số lần thử
                    wait_time = delay * attempts

                    if attempts < max_attempts:
                        logging.warning(
                            f"Lỗi khi thực thi {func.__name__}: {str(e)}n"
                            f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây"
                        )
                        time.sleep(wait_time)
                    else:
                        logging.error(
                            f"Đã thử lại {max_attempts} lần nhưng vẫn thất bại: {str(e)}"
                        )

            # Nếu đã thử hết số lần mà vẫn lỗi
            raise last_exception

        return wrapper
    return decorator

6.2. Tích hợp với FastAPI hoặc Flask

from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import logging
import time

# Thiết lập logger
logger = logging.getLogger("api_sql_operations")
logger.setLevel(logging.INFO)
handler = logging.FileHandler("logs/api_sql.log")
handler.setFormatter(logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)

# Khởi tạo FastAPI app
app = FastAPI(title="SQL Operations API")

# Khởi tạo đối tượng Database
db = SQLServerDatabase(
    conn_string='DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password',
    app_name="API_Service",
    log_dir="logs/api"
)

# Model cho request
class DataQuery(BaseModel):
    query: str
    params: list = None

# Middleware để log request và response
@app.middleware("http")
async def log_requests(request, call_next):
    start_time = time.time()

    # Log request
    logger.info(f"Request: {request.method} {request.url}")

    try:
        # Xử lý request
        response = await call_next(request)

        # Tính thời gian xử lý
        duration = time.time() - start_time

        # Log response
        logger.info(f"Response: {response.status_code} in {duration:.3f}s")

        return response

    except Exception as e:
        # Log lỗi
        duration = time.time() - start_time
        logger.error(f"Error: {str(e)} in {duration:.3f}s")

        # Trả về lỗi 500
        return JSONResponse(
            status_code=500,
            content={"detail": "Internal Server Error"}
        )

# Endpoint để thực thi truy vấn SELECT
@app.post("/query/select")
async def execute_select(query_data: DataQuery):
    try:
        # Log truy vấn
        logger.info(f"Executing SELECT query: {query_data.query[:100]}...")

        # Kiểm tra xem có phải truy vấn SELECT không
        if not query_data.query.strip().upper().startswith("SELECT"):
            raise HTTPException(
                status_code=400,
                detail="Only SELECT queries are allowed for this endpoint"
            )

        # Thực thi truy vấn
        start_time = time.time()
        results = db.execute_query(query_data.query, query_data.params)
        duration = time.time() - start_time

        # Log kết quả
        logger.info(f"Query executed in {duration:.3f}s, returned {len(results)} rows")

        return {
            "success": True,
            "duration": duration,
            "row_count": len(results),
            "results": results
        }

    except SQLServerError as e:
        # Log lỗi
        logger.error(f"SQL Error: {e.message}")

        # Trả về lỗi
        raise HTTPException(
            status_code=400,
            detail=f"SQL Error: {e.message}"
        )

    except Exception as e:
        # Log lỗi không xác định
        logger.error(f"Unexpected error: {str(e)}")

        # Trả về lỗi 500
        raise HTTPException(
            status_code=500,
            detail="Internal server error"
        )

Kết luận

Bắt lỗi và log chi tiết SQL Server

Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python là một phần không thể thiếu trong việc xây dựng các ứng dụng dữ liệu chuyên nghiệp, đáng tin cậy. Một hệ thống xử lý lỗi và ghi log tốt không chỉ giúp phát hiện và khắc phục sự cố nhanh chóng mà còn cung cấp thông tin quý giá để tối ưu hóa hiệu suất và độ tin cậy của hệ thống.

Các nguyên tắc quan trọng cần nhớ:

  1. Luôn xử lý lỗi một cách cụ thể: Phân loại và xử lý từng loại lỗi riêng biệt thay vì bắt tất cả các lỗi cùng một cách.
  2. Ghi log có cấu trúc và chi tiết: Bao gồm thông tin về thời gian, ngữ cảnh, truy vấn và tham số để dễ dàng phân tích.
  3. Sử dụng các cấp độ log phù hợp: DEBUG, INFO, WARNING, ERROR, CRITICAL cho từng loại thông tin khác nhau.
  4. Áp dụng log rotation: Ngăn chặn các file log quá lớn và khó quản lý.
  5. Tích hợp với hệ thống giám sát: Gửi thông báo khi có lỗi nghiêm trọng để xử lý kịp thời.
  6. Theo dõi hiệu suất truy vấn: Phát hiện và tối ưu các truy vấn chậm.

Với các kỹ thuật và công cụ được trình bày trong bài viết này, bạn có thể xây dựng một hệ thống logging và xử lý lỗi toàn diện, giúp ứng dụng của bạn trở nên ổn định, dễ bảo trì và hiệu quả hơn khi làm việc với SQL Server từ Python.