Skip to Content

Tự động hóa & Machine Learning: Lập lịch job ETL (trích xuất-dữ liệu-chuyển đổi) từ SQL Server sang Excel bằng Python

Tự động hóa & Machine Learning: Lập lịch job ETL (trích xuất-dữ liệu-chuyển đổi) từ SQL Server sang Excel bằng Python

Lập lịch job ETL (trích xuất-dữ liệu-chuyển đổi) từ SQL Server sang Excel bằng Python

Giới thiệu

Trong thời đại số hóa hiện nay, việc tự động hóa quy trình trích xuất, biến đổi và tải dữ liệu (ETL) từ các cơ sở dữ liệu như SQL Server sang các định dạng dễ sử dụng như Excel đóng vai trò quan trọng trong hoạt động phân tích dữ liệu của doanh nghiệp. Bài viết này sẽ hướng dẫn chi tiết cách thiết lập quy trình ETL tự động từ SQL Server sang Excel sử dụng Python, giúp doanh nghiệp tiết kiệm thời gian và nguồn lực đáng kể.

Tổng quan về quy trình ETL tự động

Quy trình ETL tự động từ SQL Server sang Excel sử dụng Python bao gồm các bước chính sau:

  1. Trích xuất (Extract): Lấy dữ liệu từ SQL Server
  2. Biến đổi (Transform): Xử lý và làm sạch dữ liệu
  3. Tải (Load): Đưa dữ liệu vào file Excel
  4. Lập lịch (Schedule): Tự động hóa toàn bộ quy trình theo lịch định sẵn

1. Chuẩn bị môi trường và thư viện

Để bắt đầu, cần cài đặt các thư viện Python cần thiết:

# Cài đặt các thư viện cần thiết
# pip install pyodbc pandas openpyxl schedule

Sau đó, import các thư viện:

import pandas as pd
import pyodbc
import os
import datetime
import time
import schedule
import logging
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows

2. Thiết lập kết nối đến SQL Server

def tao_ket_noi_sql_server():
    """Tạo kết nối đến SQL Server"""
    try:
        conn = pyodbc.connect(
            'DRIVER={SQL Server};'
            'SERVER=ten_server;'
            'DATABASE=ten_database;'
            'UID=ten_dang_nhap;'
            'PWD=mat_khau'
        )
        return conn
    except Exception as e:
        logging.error(f"Lỗi kết nối đến SQL Server: {str(e)}")
        return None

3. Trích xuất dữ liệu từ SQL Server

def trich_xuat_du_lieu(query, params=None):
    """Trích xuất dữ liệu từ SQL Server sử dụng truy vấn"""
    try:
        conn = tao_ket_noi_sql_server()
        if conn is None:
            return None
        
        if params:
            df = pd.read_sql(query, conn, params=params)
        else:
            df = pd.read_sql(query, conn)
        
        conn.close()
        return df
    except Exception as e:
        logging.error(f"Lỗi trích xuất dữ liệu: {str(e)}")
        return None

4. Biến đổi dữ liệu

def bien_doi_du_lieu(df):
    """Xử lý và làm sạch dữ liệu"""
    if df is None or df.empty:
        return None
    
    try:
        # Loại bỏ các dòng trùng lặp
        df = df.drop_duplicates()
        
        # Làm sạch dữ liệu - ví dụ: điền giá trị thiếu
        df = df.fillna({
            'TenCot1': 'Không có dữ liệu',
            'TenCot2': 0,
            'TenCot3': df['TenCot3'].mean() if 'TenCot3' in df.columns else 0
        })
        
        # Chuyển đổi kiểu dữ liệu nếu cần
        if 'NgayThang' in df.columns:
            df['NgayThang'] = pd.to_datetime(df['NgayThang'])
        
        # Tính toán các trường phụ nếu cần
        if 'DoanhThu' in df.columns and 'ChiPhi' in df.columns:
            df['LoiNhuan'] = df['DoanhThu'] - df['ChiPhi']
            
        # Sắp xếp dữ liệu
        if 'NgayThang' in df.columns:
            df = df.sort_values(by='NgayThang', ascending=False)
            
        return df
    except Exception as e:
        logging.error(f"Lỗi biến đổi dữ liệu: {str(e)}")
        return df  # Trả về dữ liệu gốc nếu có lỗi

5. Tạo file Excel với định dạng chuyên nghiệp

def tao_file_excel(df, ten_file, ten_sheet="DuLieu"):
    """Xuất dữ liệu ra file Excel với định dạng đẹp"""
    if df is None or df.empty:
        logging.warning("Không có dữ liệu để xuất ra Excel")
        return False
    
    try:
        # Tạo workbook mới
        wb = Workbook()
        ws = wb.active
        ws.title = ten_sheet
        
        # Thêm tiêu đề
        ws.append(['BÁO CÁO DỮ LIỆU'])
        ws.append([f'Ngày tạo: {datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")}'])
        ws.append([])  # Dòng trống
        
        # Thêm dữ liệu từ DataFrame
        rows = dataframe_to_rows(df, index=False, header=True)
        for r_idx, row in enumerate(rows, 4):  # Bắt đầu từ dòng 4
            ws.append(row)
            
        # Định dạng tiêu đề
        ws.merge_cells('A1:' + chr(65 + len(df.columns) - 1) + '1')
        cell = ws['A1']
        cell.font = Font(size=16, bold=True)
        cell.alignment = Alignment(horizontal='center')
        
        # Định dạng ngày tạo
        ws.merge_cells('A2:' + chr(65 + len(df.columns) - 1) + '2')
        cell = ws['A2']
        cell.font = Font(italic=True)
        cell.alignment = Alignment(horizontal='center')
        
        # Định dạng header
        header_row = 4  # Dòng header
        for col in range(1, len(df.columns) + 1):
            cell = ws.cell(row=header_row, column=col)
            cell.font = Font(bold=True)
            cell.fill = PatternFill(start_color="DDDDDD", end_color="DDDDDD", fill_type="solid")
            cell.alignment = Alignment(horizontal='center')
            
        # Tự động điều chỉnh độ rộng cột
        for col in ws.columns:
            max_length = 0
            column = col[0].column_letter
            for cell in col:
                if cell.value:
                    max_length = max(max_length, len(str(cell.value)))
            adjusted_width = (max_length + 2) * 1.2
            ws.column_dimensions[column].width = min(adjusted_width, 50)  # Giới hạn độ rộng tối đa
            
        # Lưu file
        wb.save(ten_file)
        logging.info(f"Đã xuất dữ liệu ra file Excel: {ten_file}")
        return True
    except Exception as e:
        logging.error(f"Lỗi khi tạo file Excel: {str(e)}")
        return False

6. Quy trình ETL đầy đủ

def etl_process(ten_file_excel=None):
    """Thực hiện toàn bộ quy trình ETL"""
    # Thiết lập logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        filename='etl_log.log'
    )
    
    # Nếu không cung cấp tên file, tạo tên file theo ngày hiện tại
    if ten_file_excel is None:
        ngay_hien_tai = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        ten_file_excel = f"BaoCao_{ngay_hien_tai}.xlsx"
    
    logging.info(f"Bắt đầu quy trình ETL, xuất ra file: {ten_file_excel}")
    
    # 1. Trích xuất dữ liệu
    query = """
    SELECT
        p.ProductID,
        p.Name AS TenSanPham,
        p.ProductNumber,
        pc.Name AS DanhMuc,
        p.ListPrice AS GiaNiemYet,
        p.StandardCost AS ChiPhi,
        ISNULL(SUM(sod.OrderQty), 0) AS SoLuongBan,
        ISNULL(SUM(sod.LineTotal), 0) AS DoanhThu
    FROM
        Production.Product p
    LEFT JOIN
        Sales.SalesOrderDetail sod ON p.ProductID = sod.ProductID
    LEFT JOIN
        Production.ProductSubcategory ps ON p.ProductSubcategoryID = ps.ProductSubcategoryID
    LEFT JOIN
        Production.ProductCategory pc ON ps.ProductCategoryID = pc.ProductCategoryID
    WHERE
        p.SellEndDate IS NULL
    GROUP BY
        p.ProductID, p.Name, p.ProductNumber, pc.Name, p.ListPrice, p.StandardCost
    ORDER BY
        DoanhThu DESC
    """
    
    df = trich_xuat_du_lieu(query)
    
    if df is None:
        logging.error("Không thể trích xuất dữ liệu từ SQL Server, quy trình ETL bị hủy")
        return False
    
    # 2. Biến đổi dữ liệu
    df_transformed = bien_doi_du_lieu(df)
    
    if df_transformed is None:
        logging.error("Không thể biến đổi dữ liệu, quy trình ETL bị hủy")
        return False
    
    # 3. Tạo file Excel
    result = tao_file_excel(df_transformed, ten_file_excel, "BaoCaoSanPham")
    
    if result:
        logging.info("Quy trình ETL hoàn tất thành công!")
        return True
    else:
        logging.error("Quy trình ETL thất bại!")
        return False

7. Lập lịch tự động chạy ETL

def lap_lich_etl():
    """Thiết lập lịch chạy tự động cho quy trình ETL"""
    # Thiết lập báo cáo hàng ngày lúc 8 giờ sáng
    schedule.every().day.at("08:00").do(etl_process, f"BaoCaoHangNgay_{datetime.datetime.now().strftime('%Y%m%d')}.xlsx")
    
    # Thiết lập báo cáo hàng tuần vào ngày thứ Hai
    schedule.every().monday.at("07:00").do(etl_process, f"BaoCaoTuan_{datetime.datetime.now().strftime('%Y%m%d')}.xlsx")
    
    # Thiết lập báo cáo hàng tháng vào ngày đầu tiên của tháng
    schedule.every().day.at("06:00").do(lambda: 
        etl_process(f"BaoCaoThang_{datetime.datetime.now().strftime('%Y%m')}.xlsx") 
        if datetime.datetime.now().day == 1 else None
    )
    
    logging.info("Đã thiết lập lịch tự động chạy ETL")
    
    # Vòng lặp vô hạn để duy trì lịch
    print("Chương trình lập lịch ETL đang chạy...")
    print("Nhấn Ctrl+C để dừng")
    
    try:
        while True:
            schedule.run_pending()
            time.sleep(60)  # Kiểm tra mỗi phút
    except KeyboardInterrupt:
        print("Đã dừng lập lịch ETL")
        logging.info("Đã dừng lịch tự động chạy ETL")

8. Chạy chương trình

if __name__ == "__main__":
    # Thiết lập logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("etl_scheduler.log"),
            logging.StreamHandler()
        ]
    )
    
    # Bắt đầu lập lịch
    lap_lich_etl()

9. Cấu hình ETL chạy tự động khi khởi động hệ thống

Để đảm bảo quy trình ETL luôn hoạt động, ngay cả khi máy tính khởi động lại, chúng ta có thể thiết lập script Python này để tự động chạy khi hệ thống khởi động.

Trên Windows:

  1. Tạo file batch (.bat) với nội dung:
@echo off
python D:\duong_dan\den\script\etl_scheduler.py
  1. Thêm shortcut của file batch này vào thư mục Startup của Windows.

Trên Linux:

  1. Tạo service file:
sudo nano /etc/systemd/system/etl-scheduler.service
  1. Thêm nội dung:
[Unit]
Description=ETL Scheduler Service
After=network.target

[Service]
User=username
WorkingDirectory=/duong/dan/den/thu/muc
ExecStart=/usr/bin/python3 /duong/dan/den/script/etl_scheduler.py
Restart=always

[Install]
WantedBy=multi-user.target
  1. Kích hoạt service:
sudo systemctl enable etl-scheduler.service
sudo systemctl start etl-scheduler.service

10. Giám sát và thông báo

Để giám sát quy trình ETL và nhận thông báo khi có lỗi, có thể bổ sung tính năng gửi email hoặc tin nhắn:

def gui_thong_bao(tieu_de, noi_dung):
    """Gửi email thông báo"""
    import smtplib
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    
    # Thông tin email
    email_gui = "your_email@gmail.com"
    mat_khau = "your_password"
    email_nhan = "recipient@example.com"
    
    # Tạo tin nhắn
    msg = MIMEMultipart()
    msg['From'] = email_gui
    msg['To'] = email_nhan
    msg['Subject'] = tieu_de
    
    msg.attach(MIMEText(noi_dung, 'plain'))
    
    # Gửi email
    try:
        server = smtplib.SMTP('smtp.gmail.com', 587)
        server.starttls()
        server.login(email_gui, mat_khau)
        text = msg.as_string()
        server.sendmail(email_gui, email_nhan, text)
        server.quit()
        return True
    except Exception as e:
        logging.error(f"Lỗi gửi email: {str(e)}")
        return False

11. Xử lý trường hợp đặc biệt

Để nâng cao độ tin cậy, cần xử lý các trường hợp đặc biệt như mất kết nối SQL Server, hệ thống tạm dừng, v.v.:

def etl_process_with_retry(ten_file_excel=None, so_lan_thu=3, thoi_gian_cho=300):
    """Thực hiện ETL với cơ chế thử lại nếu thất bại"""
    for lan_thu in range(so_lan_thu):
        try:
            result = etl_process(ten_file_excel)
            if result:
                return True
                
            logging.warning(f"ETL thất bại lần {lan_thu + 1}/{so_lan_thu}, chờ {thoi_gian_cho} giây trước khi thử lại...")
            time.sleep(thoi_gian_cho)
        except Exception as e:
            logging.error(f"Lỗi trong quá trình ETL lần {lan_thu + 1}: {str(e)}")
            time.sleep(thoi_gian_cho)
            
    # Nếu đã thử hết số lần mà vẫn thất bại, gửi thông báo
    gui_thong_bao(
        "ETL thất bại sau nhiều lần thử", 
        f"Quy trình ETL không thể hoàn tất sau {so_lan_thu} lần thử. Vui lòng kiểm tra hệ thống."
    )
    return False

Kết luận

Tự động hóa quy trình ETL từ SQL Server sang Excel bằng Python mang lại nhiều lợi ích cho doanh nghiệp:

  1. Tiết kiệm thời gian: Giảm thiểu thời gian thủ công cho việc xuất báo cáo định kỳ
  2. Tăng độ chính xác: Loại bỏ lỗi do con người khi xử lý dữ liệu thủ công
  3. Tăng năng suất: Nhân viên có thể tập trung vào phân tích dữ liệu thay vì tổng hợp dữ liệu
  4. Tính nhất quán: Đảm bảo báo cáo luôn được tạo theo cùng một định dạng và tiêu chuẩn
  5. Khả năng mở rộng: Dễ dàng điều chỉnh quy trình để bổ sung thêm nguồn dữ liệu hoặc định dạng đầu ra

Bằng cách kết hợp sức mạnh của SQL Server, khả năng xử lý dữ liệu của Python và tính phổ biến của Excel, doanh nghiệp có thể xây dựng một hệ thống báo cáo tự động, hiệu quả, đáp ứng nhu cầu phân tích dữ liệu hiện đại.

Thông qua việc áp dụng các nguyên tắc và kỹ thuật được trình bày trong bài viết này, các doanh nghiệp có thể nâng cao năng lực quản lý dữ liệu và tối ưu hóa quy trình ra quyết định dựa trên dữ liệu. 1

/* Tối ưu font, khoảng cách và màu chủ đạo */ body { font-family: 'Inter', sans-serif; color: #2e3a59; } h1, h2, h3 { color: #2a7a4d; /* màu xanh giống Docusaurus */ font-weight: 700; } a { color: #2a7a4d; text-decoration: none; } a:hover { text-decoration: underline; } /* Bo tròn và đổ bóng cho khối nội dung */ .card, .oe_structure { border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.05); padding: 1.5rem; }