Tự động hóa & Machine Learning: Lập lịch job ETL (trích xuất-dữ liệu-chuyển đổi) từ SQL Server sang Excel bằng Python
Lập lịch job ETL (trích xuất-dữ liệu-chuyển đổi) từ SQL Server sang Excel bằng Python
Giới thiệu
Trong thời đại số hóa hiện nay, việc tự động hóa quy trình trích xuất, biến đổi và tải dữ liệu (ETL) từ các cơ sở dữ liệu như SQL Server sang các định dạng dễ sử dụng như Excel đóng vai trò quan trọng trong hoạt động phân tích dữ liệu của doanh nghiệp. Bài viết này sẽ hướng dẫn chi tiết cách thiết lập quy trình ETL tự động từ SQL Server sang Excel sử dụng Python, giúp doanh nghiệp tiết kiệm thời gian và nguồn lực đáng kể.
Tổng quan về quy trình ETL tự động
Quy trình ETL tự động từ SQL Server sang Excel sử dụng Python bao gồm các bước chính sau:
- Trích xuất (Extract): Lấy dữ liệu từ SQL Server
- Biến đổi (Transform): Xử lý và làm sạch dữ liệu
- Tải (Load): Đưa dữ liệu vào file Excel
- Lập lịch (Schedule): Tự động hóa toàn bộ quy trình theo lịch định sẵn
1. Chuẩn bị môi trường và thư viện
Để bắt đầu, cần cài đặt các thư viện Python cần thiết:
# Cài đặt các thư viện cần thiết # pip install pyodbc pandas openpyxl schedule
Sau đó, import các thư viện:
import pandas as pd import pyodbc import os import datetime import time import schedule import logging from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill from openpyxl.utils.dataframe import dataframe_to_rows
2. Thiết lập kết nối đến SQL Server
def tao_ket_noi_sql_server(): """Tạo kết nối đến SQL Server""" try: conn = pyodbc.connect( 'DRIVER={SQL Server};' 'SERVER=ten_server;' 'DATABASE=ten_database;' 'UID=ten_dang_nhap;' 'PWD=mat_khau' ) return conn except Exception as e: logging.error(f"Lỗi kết nối đến SQL Server: {str(e)}") return None
3. Trích xuất dữ liệu từ SQL Server
def trich_xuat_du_lieu(query, params=None): """Trích xuất dữ liệu từ SQL Server sử dụng truy vấn""" try: conn = tao_ket_noi_sql_server() if conn is None: return None if params: df = pd.read_sql(query, conn, params=params) else: df = pd.read_sql(query, conn) conn.close() return df except Exception as e: logging.error(f"Lỗi trích xuất dữ liệu: {str(e)}") return None
4. Biến đổi dữ liệu
def bien_doi_du_lieu(df): """Xử lý và làm sạch dữ liệu""" if df is None or df.empty: return None try: # Loại bỏ các dòng trùng lặp df = df.drop_duplicates() # Làm sạch dữ liệu - ví dụ: điền giá trị thiếu df = df.fillna({ 'TenCot1': 'Không có dữ liệu', 'TenCot2': 0, 'TenCot3': df['TenCot3'].mean() if 'TenCot3' in df.columns else 0 }) # Chuyển đổi kiểu dữ liệu nếu cần if 'NgayThang' in df.columns: df['NgayThang'] = pd.to_datetime(df['NgayThang']) # Tính toán các trường phụ nếu cần if 'DoanhThu' in df.columns and 'ChiPhi' in df.columns: df['LoiNhuan'] = df['DoanhThu'] - df['ChiPhi'] # Sắp xếp dữ liệu if 'NgayThang' in df.columns: df = df.sort_values(by='NgayThang', ascending=False) return df except Exception as e: logging.error(f"Lỗi biến đổi dữ liệu: {str(e)}") return df # Trả về dữ liệu gốc nếu có lỗi
5. Tạo file Excel với định dạng chuyên nghiệp
def tao_file_excel(df, ten_file, ten_sheet="DuLieu"): """Xuất dữ liệu ra file Excel với định dạng đẹp""" if df is None or df.empty: logging.warning("Không có dữ liệu để xuất ra Excel") return False try: # Tạo workbook mới wb = Workbook() ws = wb.active ws.title = ten_sheet # Thêm tiêu đề ws.append(['BÁO CÁO DỮ LIỆU']) ws.append([f'Ngày tạo: {datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")}']) ws.append([]) # Dòng trống # Thêm dữ liệu từ DataFrame rows = dataframe_to_rows(df, index=False, header=True) for r_idx, row in enumerate(rows, 4): # Bắt đầu từ dòng 4 ws.append(row) # Định dạng tiêu đề ws.merge_cells('A1:' + chr(65 + len(df.columns) - 1) + '1') cell = ws['A1'] cell.font = Font(size=16, bold=True) cell.alignment = Alignment(horizontal='center') # Định dạng ngày tạo ws.merge_cells('A2:' + chr(65 + len(df.columns) - 1) + '2') cell = ws['A2'] cell.font = Font(italic=True) cell.alignment = Alignment(horizontal='center') # Định dạng header header_row = 4 # Dòng header for col in range(1, len(df.columns) + 1): cell = ws.cell(row=header_row, column=col) cell.font = Font(bold=True) cell.fill = PatternFill(start_color="DDDDDD", end_color="DDDDDD", fill_type="solid") cell.alignment = Alignment(horizontal='center') # Tự động điều chỉnh độ rộng cột for col in ws.columns: max_length = 0 column = col[0].column_letter for cell in col: if cell.value: max_length = max(max_length, len(str(cell.value))) adjusted_width = (max_length + 2) * 1.2 ws.column_dimensions[column].width = min(adjusted_width, 50) # Giới hạn độ rộng tối đa # Lưu file wb.save(ten_file) logging.info(f"Đã xuất dữ liệu ra file Excel: {ten_file}") return True except Exception as e: logging.error(f"Lỗi khi tạo file Excel: {str(e)}") return False
6. Quy trình ETL đầy đủ
def etl_process(ten_file_excel=None): """Thực hiện toàn bộ quy trình ETL""" # Thiết lập logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='etl_log.log' ) # Nếu không cung cấp tên file, tạo tên file theo ngày hiện tại if ten_file_excel is None: ngay_hien_tai = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") ten_file_excel = f"BaoCao_{ngay_hien_tai}.xlsx" logging.info(f"Bắt đầu quy trình ETL, xuất ra file: {ten_file_excel}") # 1. Trích xuất dữ liệu query = """ SELECT p.ProductID, p.Name AS TenSanPham, p.ProductNumber, pc.Name AS DanhMuc, p.ListPrice AS GiaNiemYet, p.StandardCost AS ChiPhi, ISNULL(SUM(sod.OrderQty), 0) AS SoLuongBan, ISNULL(SUM(sod.LineTotal), 0) AS DoanhThu FROM Production.Product p LEFT JOIN Sales.SalesOrderDetail sod ON p.ProductID = sod.ProductID LEFT JOIN Production.ProductSubcategory ps ON p.ProductSubcategoryID = ps.ProductSubcategoryID LEFT JOIN Production.ProductCategory pc ON ps.ProductCategoryID = pc.ProductCategoryID WHERE p.SellEndDate IS NULL GROUP BY p.ProductID, p.Name, p.ProductNumber, pc.Name, p.ListPrice, p.StandardCost ORDER BY DoanhThu DESC """ df = trich_xuat_du_lieu(query) if df is None: logging.error("Không thể trích xuất dữ liệu từ SQL Server, quy trình ETL bị hủy") return False # 2. Biến đổi dữ liệu df_transformed = bien_doi_du_lieu(df) if df_transformed is None: logging.error("Không thể biến đổi dữ liệu, quy trình ETL bị hủy") return False # 3. Tạo file Excel result = tao_file_excel(df_transformed, ten_file_excel, "BaoCaoSanPham") if result: logging.info("Quy trình ETL hoàn tất thành công!") return True else: logging.error("Quy trình ETL thất bại!") return False
7. Lập lịch tự động chạy ETL
def lap_lich_etl(): """Thiết lập lịch chạy tự động cho quy trình ETL""" # Thiết lập báo cáo hàng ngày lúc 8 giờ sáng schedule.every().day.at("08:00").do(etl_process, f"BaoCaoHangNgay_{datetime.datetime.now().strftime('%Y%m%d')}.xlsx") # Thiết lập báo cáo hàng tuần vào ngày thứ Hai schedule.every().monday.at("07:00").do(etl_process, f"BaoCaoTuan_{datetime.datetime.now().strftime('%Y%m%d')}.xlsx") # Thiết lập báo cáo hàng tháng vào ngày đầu tiên của tháng schedule.every().day.at("06:00").do(lambda: etl_process(f"BaoCaoThang_{datetime.datetime.now().strftime('%Y%m')}.xlsx") if datetime.datetime.now().day == 1 else None ) logging.info("Đã thiết lập lịch tự động chạy ETL") # Vòng lặp vô hạn để duy trì lịch print("Chương trình lập lịch ETL đang chạy...") print("Nhấn Ctrl+C để dừng") try: while True: schedule.run_pending() time.sleep(60) # Kiểm tra mỗi phút except KeyboardInterrupt: print("Đã dừng lập lịch ETL") logging.info("Đã dừng lịch tự động chạy ETL")
8. Chạy chương trình
if __name__ == "__main__": # Thiết lập logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("etl_scheduler.log"), logging.StreamHandler() ] ) # Bắt đầu lập lịch lap_lich_etl()
9. Cấu hình ETL chạy tự động khi khởi động hệ thống
Để đảm bảo quy trình ETL luôn hoạt động, ngay cả khi máy tính khởi động lại, chúng ta có thể thiết lập script Python này để tự động chạy khi hệ thống khởi động.
Trên Windows:
- Tạo file batch (.bat) với nội dung:
@echo off python D:\duong_dan\den\script\etl_scheduler.py
- Thêm shortcut của file batch này vào thư mục Startup của Windows.
Trên Linux:
- Tạo service file:
sudo nano /etc/systemd/system/etl-scheduler.service
- Thêm nội dung:
[Unit] Description=ETL Scheduler Service After=network.target [Service] User=username WorkingDirectory=/duong/dan/den/thu/muc ExecStart=/usr/bin/python3 /duong/dan/den/script/etl_scheduler.py Restart=always [Install] WantedBy=multi-user.target
- Kích hoạt service:
sudo systemctl enable etl-scheduler.service sudo systemctl start etl-scheduler.service
10. Giám sát và thông báo
Để giám sát quy trình ETL và nhận thông báo khi có lỗi, có thể bổ sung tính năng gửi email hoặc tin nhắn:
def gui_thong_bao(tieu_de, noi_dung): """Gửi email thông báo""" import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # Thông tin email email_gui = "your_email@gmail.com" mat_khau = "your_password" email_nhan = "recipient@example.com" # Tạo tin nhắn msg = MIMEMultipart() msg['From'] = email_gui msg['To'] = email_nhan msg['Subject'] = tieu_de msg.attach(MIMEText(noi_dung, 'plain')) # Gửi email try: server = smtplib.SMTP('smtp.gmail.com', 587) server.starttls() server.login(email_gui, mat_khau) text = msg.as_string() server.sendmail(email_gui, email_nhan, text) server.quit() return True except Exception as e: logging.error(f"Lỗi gửi email: {str(e)}") return False
11. Xử lý trường hợp đặc biệt
Để nâng cao độ tin cậy, cần xử lý các trường hợp đặc biệt như mất kết nối SQL Server, hệ thống tạm dừng, v.v.:
def etl_process_with_retry(ten_file_excel=None, so_lan_thu=3, thoi_gian_cho=300): """Thực hiện ETL với cơ chế thử lại nếu thất bại""" for lan_thu in range(so_lan_thu): try: result = etl_process(ten_file_excel) if result: return True logging.warning(f"ETL thất bại lần {lan_thu + 1}/{so_lan_thu}, chờ {thoi_gian_cho} giây trước khi thử lại...") time.sleep(thoi_gian_cho) except Exception as e: logging.error(f"Lỗi trong quá trình ETL lần {lan_thu + 1}: {str(e)}") time.sleep(thoi_gian_cho) # Nếu đã thử hết số lần mà vẫn thất bại, gửi thông báo gui_thong_bao( "ETL thất bại sau nhiều lần thử", f"Quy trình ETL không thể hoàn tất sau {so_lan_thu} lần thử. Vui lòng kiểm tra hệ thống." ) return False
Kết luận
Tự động hóa quy trình ETL từ SQL Server sang Excel bằng Python mang lại nhiều lợi ích cho doanh nghiệp:
- Tiết kiệm thời gian: Giảm thiểu thời gian thủ công cho việc xuất báo cáo định kỳ
- Tăng độ chính xác: Loại bỏ lỗi do con người khi xử lý dữ liệu thủ công
- Tăng năng suất: Nhân viên có thể tập trung vào phân tích dữ liệu thay vì tổng hợp dữ liệu
- Tính nhất quán: Đảm bảo báo cáo luôn được tạo theo cùng một định dạng và tiêu chuẩn
- Khả năng mở rộng: Dễ dàng điều chỉnh quy trình để bổ sung thêm nguồn dữ liệu hoặc định dạng đầu ra
Bằng cách kết hợp sức mạnh của SQL Server, khả năng xử lý dữ liệu của Python và tính phổ biến của Excel, doanh nghiệp có thể xây dựng một hệ thống báo cáo tự động, hiệu quả, đáp ứng nhu cầu phân tích dữ liệu hiện đại.
Thông qua việc áp dụng các nguyên tắc và kỹ thuật được trình bày trong bài viết này, các doanh nghiệp có thể nâng cao năng lực quản lý dữ liệu và tối ưu hóa quy trình ra quyết định dựa trên dữ liệu. 1