Files
ewoooc/services/import_service.py
OoO 308efdce25
All checks were successful
CD Pipeline / deploy (push) Successful in 1m4s
chore(observability): clarify quick review completion copy
2026-05-06 19:49:28 +08:00

952 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自動匯入服務
負責從 Google Drive 自動下載、匯入、刪除檔案
"""
import os
import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any
from sqlalchemy import bindparam, create_engine
from sqlalchemy.orm import sessionmaker
import pandas as pd
import pytz
# 台北時區
TAIPEI_TZ = pytz.timezone('Asia/Taipei')
from services.google_drive_service import drive_service
from database.import_models import ImportJob, ImportConfig, Base
from database.manager import ensure_metadata_initialized
# 設定日誌
logger = logging.getLogger(__name__)
# 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite
def _create_engine_with_pool(db_path):
"""建立帶有連線池配置的資料庫引擎"""
if db_path.startswith('postgresql://'):
return create_engine(
db_path,
pool_pre_ping=True,
pool_size=5,
max_overflow=10,
pool_recycle=1800,
pool_timeout=30,
connect_args={
'connect_timeout': 10,
'options': '-c statement_timeout=120000' # 匯入需要更長的超時
}
)
elif db_path.startswith('sqlite://'):
return create_engine(db_path)
else:
return create_engine(f'sqlite:///{db_path}')
try:
from config import DATABASE_PATH as CONFIG_DATABASE_PATH
engine = _create_engine_with_pool(CONFIG_DATABASE_PATH)
logger.info(f"使用資料庫: {CONFIG_DATABASE_PATH.split('@')[-1] if '@' in CONFIG_DATABASE_PATH else CONFIG_DATABASE_PATH}")
except ImportError:
# 備援方案:使用環境變數或預設值
DATABASE_PATH = os.getenv('DATABASE_PATH', 'data/momo_database.db')
engine = _create_engine_with_pool(DATABASE_PATH)
logger.warning(f"無法匯入 config使用備援資料庫路徑: {DATABASE_PATH}")
Session = sessionmaker(bind=engine)
class ImportService:
"""匯入服務類別"""
def __init__(self):
"""初始化匯入服務"""
self._init_database()
def _init_database(self):
"""初始化資料庫表"""
try:
ensure_metadata_initialized(engine, use_postgres_lock=str(engine.url).startswith('postgresql'))
logger.info("匯入追蹤表已初始化")
except Exception as e:
logger.error(f"初始化資料庫表失敗: {str(e)}")
def get_config(self, key: str, default: str = None) -> Optional[str]:
"""
取得配置值
Args:
key: 配置鍵
default: 預設值
Returns:
Optional[str]: 配置值
"""
session = Session()
try:
config = session.query(ImportConfig).filter_by(config_key=key).first()
if config:
return config.config_value
return default
finally:
session.close()
def set_config(self, key: str, value: str, config_type: str = 'string', description: str = None):
"""
設定配置值
Args:
key: 配置鍵
value: 配置值
config_type: 配置類型
description: 配置說明
"""
session = Session()
try:
config = session.query(ImportConfig).filter_by(config_key=key).first()
if config:
config.config_value = value
config.config_type = config_type
if description:
config.description = description
config.updated_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
else:
config = ImportConfig(
config_key=key,
config_value=value,
config_type=config_type,
description=description
)
session.add(config)
session.commit()
logger.info(f"配置已更新: {key} = {value}")
except Exception as e:
session.rollback()
logger.error(f"設定配置失敗: {str(e)}")
finally:
session.close()
def create_import_job(self, job_type: str, drive_file_id: str, drive_file_name: str,
drive_file_size: int = None) -> Optional[int]:
"""
建立匯入任務
Args:
job_type: 任務類型daily_sales 或 vendor_stockout
drive_file_id: Google Drive 檔案 ID
drive_file_name: 檔案名稱
drive_file_size: 檔案大小
Returns:
Optional[int]: 任務 ID
"""
session = Session()
try:
job = ImportJob(
job_type=job_type,
status='pending',
drive_file_id=drive_file_id,
drive_file_name=drive_file_name,
drive_file_size=drive_file_size,
progress_percent=0.0,
current_step='等待開始...'
)
session.add(job)
session.commit()
job_id = job.id
logger.info(f"已建立匯入任務: ID={job_id}, 檔案={drive_file_name}")
return job_id
except Exception as e:
session.rollback()
logger.error(f"建立匯入任務失敗: {str(e)}")
return None
finally:
session.close()
def update_job_status(self, job_id: int, status: str, progress: float = None,
current_step: str = None, error_message: str = None):
"""
更新任務狀態
Args:
job_id: 任務 ID
status: 狀態
progress: 進度百分比
current_step: 當前步驟
error_message: 錯誤訊息
"""
session = Session()
try:
job = session.query(ImportJob).filter_by(id=job_id).first()
if not job:
logger.warning(f"找不到任務: ID={job_id}")
return
job.status = status
if progress is not None:
job.progress_percent = progress
if current_step:
job.current_step = current_step
if error_message:
job.error_message = error_message
# 更新時間戳 (2026-01-30 修正:使用台北時區)
if status == 'downloading' or status == 'importing':
if not job.started_at:
job.started_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
elif status in ['completed', 'failed']:
job.completed_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
session.commit()
logger.info(f"任務 {job_id} 狀態已更新: {status} ({progress}%)")
except Exception as e:
session.rollback()
logger.error(f"更新任務狀態失敗: {str(e)}")
finally:
session.close()
def update_job_progress(self, job_id: int, total_rows: int = None, processed_rows: int = None,
success_rows: int = None, error_rows: int = None):
"""
更新任務進度
Args:
job_id: 任務 ID
total_rows: 總行數
processed_rows: 已處理行數
success_rows: 成功行數
error_rows: 錯誤行數
"""
session = Session()
try:
job = session.query(ImportJob).filter_by(id=job_id).first()
if not job:
return
if total_rows is not None:
job.total_rows = total_rows
if processed_rows is not None:
job.processed_rows = processed_rows
if success_rows is not None:
job.success_rows = success_rows
if error_rows is not None:
job.error_rows = error_rows
# 計算進度百分比
if job.total_rows and job.processed_rows:
job.progress_percent = (job.processed_rows / job.total_rows) * 100
session.commit()
except Exception as e:
session.rollback()
logger.error(f"更新任務進度失敗: {str(e)}")
finally:
session.close()
def get_job_status(self, job_id: int) -> Optional[Dict[str, Any]]:
"""
取得任務狀態
Args:
job_id: 任務 ID
Returns:
Optional[Dict]: 任務資訊
"""
session = Session()
try:
job = session.query(ImportJob).filter_by(id=job_id).first()
if job:
return job.to_dict()
return None
finally:
session.close()
def get_recent_jobs(self, limit: int = 20) -> list:
"""
取得最近的任務清單
Args:
limit: 返回數量
Returns:
list: 任務清單
"""
session = Session()
try:
jobs = session.query(ImportJob).order_by(
ImportJob.created_at.desc()
).limit(limit).all()
return [job.to_dict() for job in jobs]
finally:
session.close()
@staticmethod
def _has_any_column(cols, keywords):
"""檢查欄位中是否包含任一關鍵字。"""
normalized_cols = [str(col).strip() for col in cols]
return any(kw in col for col in normalized_cols for kw in keywords)
def _validate_daily_sales_columns(self, df: pd.DataFrame) -> list:
"""回傳 daily sales Excel 缺少的必要欄位分類。"""
required_groups = {
"商品名稱類": ["商品名稱", "品名", "Product", "Name"],
"業績金額類": ["銷售金額", "業績", "金額", "Amount", "Sales", "Total"],
}
return [
label
for label, keywords in required_groups.items()
if not self._has_any_column(df.columns, keywords)
]
@staticmethod
def _cleanup_excel_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""清理 Excel 讀取後的全空列欄與欄位名稱。"""
df = df.dropna(axis=0, how='all').dropna(axis=1, how='all')
df.columns = [str(col).strip() for col in df.columns]
return df
def _read_daily_sales_excel(self, file_path: str) -> pd.DataFrame:
"""
讀取當日業績 Excel若預設第一列不是表頭會掃描前 20 列尋找真正表頭。
"""
df = self._cleanup_excel_dataframe(pd.read_excel(file_path, engine='openpyxl', dtype=str))
if not df.empty and not self._validate_daily_sales_columns(df):
return df
excel = pd.ExcelFile(file_path, engine='openpyxl')
for sheet_name in excel.sheet_names:
preview = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=None,
nrows=20,
engine='openpyxl',
dtype=str,
)
for header_row in range(len(preview.index)):
candidate_columns = preview.iloc[header_row].dropna().astype(str).str.strip().tolist()
if not candidate_columns:
continue
candidate_df = pd.DataFrame(columns=candidate_columns)
if self._validate_daily_sales_columns(candidate_df):
continue
detected_df = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=header_row,
engine='openpyxl',
dtype=str,
)
detected_df = self._cleanup_excel_dataframe(detected_df)
logger.info(
f"Excel 表頭自動偵測成功: sheet={sheet_name}, header_row={header_row + 1}"
)
return detected_df
return df
@staticmethod
def _normalize_dates_for_sql(date_values) -> list:
"""將日期值正規化成 YYYY-MM-DD 字串,供 SQL expanding bind 使用。"""
normalized_dates = []
for value in date_values:
if value is None or pd.isna(value):
continue
parsed = pd.to_datetime(value, errors='coerce')
if pd.notna(parsed):
normalized_dates.append(str(parsed.date()))
return sorted(set(normalized_dates))
@staticmethod
def _calculate_data_lag_days(date_max: str) -> Optional[int]:
"""計算匯入資料最大日期距今天數。"""
if not date_max:
return None
parsed = pd.to_datetime(date_max, errors='coerce')
if pd.isna(parsed):
return None
today = datetime.now(TAIPEI_TZ).date()
return max((today - parsed.date()).days, 0)
def process_daily_sales_import(self, job_id: int, file_path: str) -> bool:
"""
處理當日業績匯入
Args:
job_id: 任務 ID
file_path: Excel 檔案路徑
Returns:
bool: 是否成功
"""
try:
self.update_job_status(job_id, 'importing', 50, '正在匯入資料...')
# 讀取 Excel 檔案
logger.info(f"開始讀取 Excel 檔案: {file_path}")
df = self._read_daily_sales_excel(file_path)
if df.empty:
error_msg = "Excel 檔案為空"
self.update_job_status(job_id, 'failed', 50, '匯入失敗', error_msg)
return False
# ─────────────────────────────────────────────
# 2026-04-19: daily_sales_snapshot 前置欄位防禦 (技術債修復)
# 原因:若 Excel 欄位名靜默變更,匯入會成功但 Hermes SQL JOIN 會找不到數據 → 告警管線失真
# 規則:至少需偵測到「商品名稱」與「銷售金額」類欄位 (容忍多種別名)
# ─────────────────────────────────────────────
missing = self._validate_daily_sales_columns(df)
if missing:
error_msg = (
f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}"
f"現有欄位:{list(df.columns)[:30]}"
)
logger.error(error_msg)
self.update_job_status(job_id, 'failed', 50, '欄位驗證失敗', error_msg)
return False
# 匯入到資料庫
table_name = 'daily_sales_snapshot'
# 找到日期欄位
date_col = None
for possible_col in ['日期', '訂單日期', '交易日期', 'Date']:
if possible_col in df.columns:
date_col = possible_col
break
if date_col:
# 解析日期
df['snapshot_date'] = pd.to_datetime(df[date_col], errors='coerce').dt.date
logger.info(f"使用日期欄位: {date_col}")
else:
# 使用當前日期
df['snapshot_date'] = datetime.now(TAIPEI_TZ).date()
logger.info("未找到日期欄位,使用當前日期(台北時區)")
# 寫入資料庫 - 使用全域的 engine支援 PostgreSQL 和 SQLite
from sqlalchemy import text
# 使用模組頂部定義的 engine確保連接到正確的資料庫
# 更新進度
total_rows = len(df)
self.update_job_progress(job_id, total_rows=total_rows, processed_rows=0)
# 取得此次匯入的日期範圍
import_dates = self._normalize_dates_for_sql(df['snapshot_date'].unique())
logger.info(f"本次匯入包含 {len(import_dates)} 個日期的資料")
if not import_dates:
error_msg = "匯入資料缺少有效日期,拒絕寫入以避免日期未知資料污染"
self.update_job_status(job_id, 'failed', 55, '日期驗證失敗', error_msg)
logger.error(f"任務 {job_id} {error_msg}")
return False
# === V-New 2026-01-15: 同步寫入 realtime_sales_monthly ===
# 目的:讓當日業績 raw data 同時呈現在「業績分析儀表板」
# 2026-05-05 修復daily 與 monthly 改成同一個 transaction避免半成功。
self.update_job_status(job_id, 'importing', 80, '準備同步至業績分析儀表板...')
sync_success = False
sync_error_msg = None
monthly_table = 'realtime_sales_monthly'
# 準備資料:移除 snapshot_date 欄位realtime_sales_monthly 不需要此欄位)
df_monthly = df.drop(columns=['snapshot_date'], errors='ignore')
# 2026-01-30 修正:強化欄位名稱轉換
# 將特殊字符轉換為 PostgreSQL 安全格式
column_mapping = {}
for col in df_monthly.columns:
new_col = col.replace('%', '_pct').replace('(', '_').replace(')', '_')
column_mapping[col] = new_col
df_monthly = df_monthly.rename(columns=column_mapping)
converted_cols = [f"'{k}' -> '{v}'" for k, v in column_mapping.items() if k != v]
if converted_cols:
logger.info(f"任務 {job_id} 欄位名稱轉換: {', '.join(converted_cols)}")
logger.info(f"任務 {job_id} 欄位轉換完成,共 {len(df_monthly.columns)} 個欄位")
# 2026-01-30 新增:驗證 DataFrame 欄位和目標表欄位是否一致
with engine.connect() as conn:
if engine.dialect.name == 'sqlite':
col_query = text(f'PRAGMA table_info("{monthly_table}")')
target_columns = {row[1] for row in conn.execute(col_query) if row[1] != 'id'}
else:
col_query = text("""
SELECT column_name FROM information_schema.columns
WHERE table_name = :table_name AND column_name != 'id'
ORDER BY ordinal_position
""")
target_columns = {
row[0] for row in conn.execute(col_query, {'table_name': monthly_table})
}
df_columns = set(df_monthly.columns)
missing_in_table = df_columns - target_columns
missing_in_df = target_columns - df_columns
if missing_in_table:
logger.warning(f"任務 {job_id} 欄位警告: DataFrame 有但表中沒有: {missing_in_table}")
# 移除表中沒有的欄位,避免 INSERT 失敗
df_monthly = df_monthly.drop(columns=list(missing_in_table), errors='ignore')
logger.info(f"任務 {job_id} 已移除多餘欄位,剩餘 {len(df_monthly.columns)} 個欄位")
if missing_in_df:
logger.warning(f"任務 {job_id} 欄位警告: 表中有但 DataFrame 沒有: {missing_in_df}")
unique_dates = self._normalize_dates_for_sql(
df[date_col].dropna().unique() if date_col and date_col in df.columns else df['snapshot_date'].dropna().unique()
)
logger.info(f"任務 {job_id} 準備同步 {len(unique_dates)} 個日期的資料")
if not unique_dates:
error_msg = "realtime_sales_monthly 同步缺少有效日期,拒絕寫入"
self.update_job_status(job_id, 'failed', 85, '同步日期驗證失敗', error_msg)
logger.error(f"任務 {job_id} {error_msg}")
return False
snapshot_date_expr = 'date(snapshot_date)' if engine.dialect.name == 'sqlite' else 'snapshot_date::date'
monthly_date_expr = 'date("日期")' if engine.dialect.name == 'sqlite' else '"日期"::date'
expected_daily_count = len(df[df['snapshot_date'].astype(str).isin(import_dates)])
expected_monthly_count = len(df_monthly)
max_retries = 2
retry_count = 0
while retry_count <= max_retries and not sync_success:
try:
if retry_count > 0:
logger.warning(f"任務 {job_id}{retry_count} 次重試原子匯入...")
self.update_job_status(job_id, 'importing', 82, f'重試原子匯入中 ({retry_count}/{max_retries})...')
self.update_job_status(job_id, 'importing', 85, '原子寫入兩張業績表...')
with engine.begin() as conn:
delete_snapshot_query = text(
f"DELETE FROM {table_name} WHERE {snapshot_date_expr} IN :dates"
).bindparams(bindparam('dates', expanding=True))
deleted_snapshot = conn.execute(delete_snapshot_query, {'dates': import_dates}).rowcount
if deleted_snapshot > 0:
logger.info(f"已刪除 {deleted_snapshot} 筆 daily_sales_snapshot 舊資料(覆蓋模式)")
df.to_sql(
table_name,
conn,
if_exists='append',
index=False,
method='multi',
chunksize=1000
)
verify_snapshot_query = text(
f"SELECT COUNT(*) FROM {table_name} WHERE {snapshot_date_expr} IN :dates"
).bindparams(bindparam('dates', expanding=True))
daily_count = conn.execute(verify_snapshot_query, {'dates': import_dates}).scalar()
if daily_count < expected_daily_count:
raise RuntimeError(
f"daily_sales_snapshot 寫入驗證失敗: 預期 {expected_daily_count} 筆, 實際 {daily_count}"
)
delete_monthly_query = text(
f'DELETE FROM {monthly_table} WHERE {monthly_date_expr} IN :dates'
).bindparams(bindparam('dates', expanding=True))
deleted_monthly = conn.execute(delete_monthly_query, {'dates': unique_dates}).rowcount
if deleted_monthly > 0:
logger.info(f"任務 {job_id} 已從 {monthly_table} 刪除 {deleted_monthly} 筆同日期舊資料")
df_monthly.to_sql(
monthly_table,
conn,
if_exists='append',
index=False,
method='multi',
chunksize=1000
)
verify_monthly_query = text(
f'SELECT COUNT(*) FROM {monthly_table} WHERE {monthly_date_expr} IN :dates'
).bindparams(bindparam('dates', expanding=True))
monthly_count = conn.execute(verify_monthly_query, {'dates': unique_dates}).scalar()
if monthly_count < expected_monthly_count:
raise RuntimeError(
f"{monthly_table} 寫入驗證失敗: 預期 {expected_monthly_count} 筆, 實際 {monthly_count}"
)
sync_success = True
logger.info(
f"任務 {job_id} 原子匯入成功: daily={expected_daily_count} 筆, "
f"monthly={expected_monthly_count}"
)
except Exception as transaction_error:
retry_count += 1
sync_error_msg = str(transaction_error)
logger.error(
f"任務 {job_id} 原子匯入失敗 (嘗試 {retry_count}/{max_retries + 1}): {sync_error_msg}",
exc_info=True,
)
if retry_count > max_retries:
break
if not sync_success:
error_msg = f"原子匯入失敗,兩張表已回滾: {sync_error_msg}"
self.update_job_status(job_id, 'failed', 90, '原子匯入失敗', error_msg)
logger.error(f"任務 {job_id} {error_msg}")
try:
from services.notification_manager import NotificationManager
notifier = NotificationManager()
alert_msg = (
f"⚠️ 業績資料原子匯入失敗告警\n"
f"{'='*30}\n"
f"任務 ID: {job_id}\n"
f"錯誤: {sync_error_msg[:200]}\n"
f"{'='*30}\n"
f"本次 daily_sales_snapshot / realtime_sales_monthly 已一起 rollback請檢查匯入檔案"
)
notifier._send_telegram_messages([alert_msg])
logger.info(f"任務 {job_id} 已發送原子匯入失敗告警")
except Exception as notify_error:
logger.error(f"任務 {job_id} 發送告警失敗: {notify_error}")
return False
# 更新成功資訊
self.update_job_progress(
job_id,
processed_rows=total_rows,
success_rows=total_rows
)
# 2026-01-30 修正:根據同步狀態設置完成訊息
if sync_success:
completion_msg = '匯入完成(已同步至業績分析儀表板)'
else:
completion_msg = '匯入完成(警告:業績分析儀表板同步失敗,需手動處理)'
self.update_job_status(
job_id,
'completed',
100,
completion_msg
)
# 計算日期範圍
date_min = None
date_max = None
imported_dates = self._normalize_dates_for_sql(df['snapshot_date'].dropna().unique())
data_lag_days = None
valid_dates = imported_dates
if len(valid_dates) > 0:
sorted_dates = sorted(valid_dates)
if sorted_dates:
date_min = sorted_dates[0]
date_max = sorted_dates[-1]
data_lag_days = self._calculate_data_lag_days(date_max)
logger.info(f"任務 {job_id} 日期範圍: {date_min} ~ {date_max}")
# 更新匯入摘要 (2026-01-30 修正:加入同步狀態)
if sync_success:
sync_message = f'成功匯入 {total_rows} 筆資料,已同步至業績分析儀表板'
else:
sync_message = f'成功匯入 {total_rows} 筆資料,但同步至業績分析儀表板失敗: {sync_error_msg}'
summary = {
'imported_count': total_rows,
'table_name': table_name,
'synced_to': 'realtime_sales_monthly' if sync_success else None,
'sync_success': sync_success,
'sync_error': sync_error_msg,
'verified': True, # daily_sales_snapshot 驗證
'date_min': date_min,
'date_max': date_max,
'imported_dates': imported_dates,
'data_lag_days': data_lag_days,
'message': sync_message
}
session = Session()
try:
job = session.query(ImportJob).filter_by(id=job_id).first()
if job:
job.import_summary = json.dumps(summary, ensure_ascii=False)
session.commit()
finally:
session.close()
logger.info(f"任務 {job_id} 匯入成功: {total_rows}")
# cache 失效改靠 _get_data_fingerprintDB max(snapshot_date)+count(*)
# 寫入後指紋自動跳號4 worker 下一次 request 時各自偵測失效,
# 取代不可靠的 N-POST hack命中率僅 9.4%,見 web-researcher 報告)。
return True
except Exception as e:
error_msg = f"匯入過程發生異常: {str(e)}"
self.update_job_status(job_id, 'failed', 50, '匯入失敗', error_msg)
logger.error(f"任務 {job_id} 匯入異常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
def auto_import_from_drive(self) -> Dict[str, Any]:
"""
從 Google Drive 自動匯入檔案
Returns:
Dict: 執行結果
"""
try:
# 取得配置
folder_path = self.get_config('gdrive_folder_path', '業績報表/當日業績')
file_pattern = self.get_config('gdrive_file_pattern', '即時業績_當日')
logger.info(f"開始檢查 Google Drive: {folder_path}")
# 列出檔案
files = drive_service.list_files_in_folder(folder_path, file_pattern)
if not files:
logger.info("沒有找到待匯入的檔案")
# Staleness gate (critic-approved 2026-05-03)
# 'move-then-success' 反模式:成功 import 後 move_file 把 Excel 搬到
# 「已匯入」資料夾 → 後續排程 list 回空 → 走此分支 silent return success
# → 4/27~5/2 daily_sales_snapshot 停更 8 天無告警。補主動偵測:
# Drive 空 + DB ≥3 天無新資料時主動發催促告警(週末跨假期不誤觸)。
try:
from database.manager import get_session
from sqlalchemy import text
from datetime import date
from services.openclaw_strategist_service import _send_data_stale_alert
_stale_session = get_session()
try:
last_date = _stale_session.execute(
text("SELECT MAX(snapshot_date)::date FROM daily_sales_snapshot")
).scalar()
finally:
_stale_session.close()
if last_date:
days_since = (date.today() - last_date).days
if days_since >= 3:
_send_data_stale_alert(
report_type="upstream_drive",
last_date=str(last_date),
period=f"已停更 {days_since}",
)
except Exception:
logger.error(
"staleness check failed in auto_import_from_drive",
exc_info=True,
)
return {
'success': True,
'message': '沒有找到待匯入的檔案',
'file_count': 0
}
# 處理每個檔案
imported_count = 0
total_rows = 0
all_dates = [] # 收集所有匯入的日期
failed_files = []
data_lag_days = None
for file in files:
file_id = file['id']
file_name = file['name']
file_size = file.get('size', 0)
logger.info(f"發現檔案: {file_name}")
# 建立匯入任務
job_id = self.create_import_job('daily_sales', file_id, file_name, file_size)
if not job_id:
failed_files.append(file_name)
logger.error(f"建立匯入任務失敗,跳過檔案: {file_name}")
continue
# 下載檔案
self.update_job_status(job_id, 'downloading', 10, '正在下載檔案...')
temp_dir = 'data/temp'
os.makedirs(temp_dir, exist_ok=True)
local_path = os.path.join(temp_dir, file_name)
if not drive_service.download_file(file_id, local_path):
self.update_job_status(job_id, 'failed', 10, '下載失敗', '無法從 Google Drive 下載檔案')
failed_files.append(file_name)
logger.error(f"Google Drive 檔案下載失敗: {file_name}")
continue
# 更新本地路徑
session = Session()
try:
job = session.query(ImportJob).filter_by(id=job_id).first()
if job:
job.local_file_path = local_path
session.commit()
finally:
session.close()
self.update_job_status(job_id, 'downloading', 40, '下載完成')
# 匯入資料
if self.process_daily_sales_import(job_id, local_path):
# 移動 Google Drive 檔案到「已匯入」資料夾
self.update_job_status(job_id, 'completed', 90, '正在移動雲端檔案...')
# 取得「已匯入」資料夾路徑配置
archive_folder = self.get_config('gdrive_archive_folder', '已匯入')
if drive_service.move_file(file_id, archive_folder):
logger.info(f"已移動 Google Drive 檔案到「{archive_folder}」: {file_name}")
else:
logger.warning(f"無法移動 Google Drive 檔案: {file_name}")
self.update_job_status(job_id, 'completed', 100, '完成')
imported_count += 1
# 讀取 job summary 取得匯入筆數和日期範圍
session = Session()
try:
job = session.query(ImportJob).filter_by(id=job_id).first()
if job and job.import_summary:
summary = json.loads(job.import_summary)
total_rows += summary.get('imported_count', 0)
if summary.get('date_min'):
all_dates.append(summary['date_min'])
if summary.get('date_max'):
all_dates.append(summary['date_max'])
all_dates.extend(summary.get('imported_dates') or [])
if summary.get('data_lag_days') is not None:
lag_value = summary.get('data_lag_days')
data_lag_days = lag_value if data_lag_days is None else max(data_lag_days, lag_value)
elif job:
# V-Fix: 防止摘要缺失時通知顯示 0 筆、日期未知。
# summary 是驗證與通知的主要來源;若缺失,至少回退到進度欄位並留下告警。
fallback_rows = job.success_rows or job.total_rows or 0
total_rows += fallback_rows
logger.warning(
f"任務 {job_id} 匯入成功但缺少 import_summary"
f"已使用 job 進度欄位回補筆數: {fallback_rows}"
)
finally:
session.close()
# 清理本地檔案
try:
os.remove(local_path)
logger.info(f"已清理本地檔案: {local_path}")
except Exception as e:
logger.warning(f"清理本地檔案失敗: {str(e)}")
else:
failed_files.append(file_name)
logger.error(f"檔案匯入失敗,準備移至失敗資料夾: {file_name}")
failed_folder = self.get_config('gdrive_failed_folder', '匯入失敗')
if drive_service.move_file(file_id, failed_folder):
logger.info(f"已移動失敗檔案到「{failed_folder}」: {file_name}")
else:
logger.warning(f"無法移動失敗檔案,保留於原資料夾待人工檢查: {file_name}")
# 計算日期範圍
date_range = None
if all_dates:
sorted_dates = sorted(set(all_dates))
if sorted_dates:
date_range = {
'min': sorted_dates[0],
'max': sorted_dates[-1]
}
if failed_files or imported_count == 0:
failed_count = len(files) - imported_count
failed_label = ''.join(failed_files[:5]) if failed_files else '無成功匯入檔案'
return {
'success': False,
'message': (
f'找到 {len(files)} 個檔案,但成功匯入 {imported_count} 個、'
f'失敗 {failed_count} 個:{failed_label}'
),
'file_count': len(files),
'imported_count': imported_count,
'failed_count': failed_count,
'failed_files': failed_files,
'total_rows': total_rows,
'date_range': date_range,
'data_lag_days': data_lag_days
}
return {
'success': True,
'message': f'成功匯入 {imported_count} 個檔案',
'file_count': len(files),
'imported_count': imported_count,
'total_rows': total_rows,
'date_range': date_range,
'data_lag_days': data_lag_days
}
except Exception as e:
error_msg = str(e)
logger.error(f"自動匯入失敗: {error_msg}")
# 區分連線錯誤和真正的匯入錯誤
# 連線錯誤(如 Broken pipe、網路問題不應發送告警
connection_errors = [
'Broken pipe',
'Connection refused',
'Connection reset',
'Connection timed out',
'Name or service not known',
'No route to host',
'Network is unreachable',
'SSL',
'authenticate',
'credentials',
'token'
]
is_connection_error = any(err.lower() in error_msg.lower() for err in connection_errors)
if is_connection_error:
# 連線錯誤:返回成功但無檔案(避免發送告警)
logger.warning(f"Google Drive 連線問題,跳過本次匯入檢查: {error_msg}")
return {
'success': True, # 標記為成功避免告警
'message': f'Google Drive 連線問題,跳過本次檢查',
'file_count': 0,
'imported_count': 0,
'connection_error': True # 標記為連線錯誤供日誌記錄
}
else:
# 真正的匯入錯誤:返回失敗
return {
'success': False,
'message': f'自動匯入失敗: {error_msg}',
'file_count': 0,
'imported_count': 0
}
# 建立全域服務實例
import_service = ImportService()