feat: EwoooC 初始化 — 完整專案推版至 Gitea
Some checks failed
CD Pipeline / deploy (push) Failing after 59s

- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-19 01:21:13 +08:00
commit 1b4f3a7bbe
504 changed files with 387725 additions and 0 deletions

View File

@@ -0,0 +1,59 @@
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
# 讀取需要修復的商品清單
with open('incorrect_images.txt', 'r') as f:
i_codes = [line.strip() for line in f if line.strip()]
print(f"🔍 分析剩餘 {len(i_codes)} 個錯誤商品...\n")
# 分析圖片 URL 格式
old_format = 0 # img2.momoshop.com.tw
og_format_wrong = 0 # og.momoshop.com.tw 但 i_code 不對
no_image = 0
old_format_list = []
og_wrong_list = []
for i_code in i_codes[:100]: # 只檢查前 100 個
product = session.query(Product).filter(Product.i_code == i_code).first()
if product:
if not product.image_url:
no_image += 1
elif 'img' in product.image_url and 'momoshop.com.tw/ecm/img/online' in product.image_url:
old_format += 1
old_format_list.append(i_code)
elif 'og.momoshop.com.tw' in product.image_url:
og_format_wrong += 1
og_wrong_list.append(i_code)
print(f"📊 前 100 個商品分析:")
print(f" 舊格式 (img2.momoshop...): {old_format}")
print(f" 新格式但 i_code 錯誤: {og_format_wrong}")
print(f" 無圖片: {no_image}")
print(f"\n舊格式商品範例 (前 10 個):")
for i_code in old_format_list[:10]:
product = session.query(Product).filter(Product.i_code == i_code).first()
print(f" [{i_code}] {product.name[:40]}...")
print(f" {product.image_url[:80]}...")
if og_wrong_list:
print(f"\n新格式但 i_code 錯誤的商品範例 (前 5 個):")
for i_code in og_wrong_list[:5]:
product = session.query(Product).filter(Product.i_code == i_code).first()
print(f" [{i_code}] {product.name[:40]}...")
print(f" {product.image_url}")
session.close()

View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
將導航列改為藍色調主題
"""
import os
import re
from pathlib import Path
def apply_blue_navbar(file_path):
"""將導航列改為藍色調主題"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
# 將黑色導航列改為藍色主題(使用 navbar-dark 配合自定義背景)
content = re.sub(
r'<nav class="navbar navbar-expand-xl navbar-dark bg-dark([^"]*)"',
r'<nav class="navbar navbar-expand-xl navbar-dark bg-primary\1"',
content
)
# 檢查是否已經有導航列的自定義樣式
if 'navbar-dark.bg-primary' not in content and '.navbar {' in content:
# 在第一個 .navbar { 之前添加藍色主題樣式
blue_navbar_style = """ /* Blue Navbar Theme */
.navbar-dark.bg-primary {
background: linear-gradient(135deg, #4F46E5 0%, #6366F1 100%) !important;
box-shadow: 0 4px 12px rgba(79, 70, 229, 0.3);
}
.navbar-dark .navbar-brand {
color: #ffffff !important;
font-weight: 600;
}
.navbar-dark .navbar-nav .nav-link {
color: rgba(255, 255, 255, 0.9) !important;
font-weight: 500;
transition: all 0.3s;
}
.navbar-dark .navbar-nav .nav-link:hover {
color: #ffffff !important;
background: rgba(255, 255, 255, 0.1);
border-radius: 6px;
}
.navbar-dark .navbar-nav .nav-link.active {
color: #ffffff !important;
background: rgba(255, 255, 255, 0.15);
border-radius: 6px;
font-weight: 600;
}
.navbar-dark .navbar-text {
color: rgba(255, 255, 255, 0.8) !important;
}
"""
# 在第一個 .navbar { 之前插入
content = content.replace(' .navbar {', blue_navbar_style + ' .navbar {', 1)
elif 'navbar-dark.bg-primary' not in content:
# 如果沒有 .navbar { 則在 </style> 之前添加
blue_navbar_style = """
/* Blue Navbar Theme */
.navbar-dark.bg-primary {
background: linear-gradient(135deg, #4F46E5 0%, #6366F1 100%) !important;
box-shadow: 0 4px 12px rgba(79, 70, 229, 0.3);
}
.navbar-dark .navbar-brand {
color: #ffffff !important;
font-weight: 600;
}
.navbar-dark .navbar-nav .nav-link {
color: rgba(255, 255, 255, 0.9) !important;
font-weight: 500;
transition: all 0.3s;
}
.navbar-dark .navbar-nav .nav-link:hover {
color: #ffffff !important;
background: rgba(255, 255, 255, 0.1);
border-radius: 6px;
}
.navbar-dark .navbar-nav .nav-link.active {
color: #ffffff !important;
background: rgba(255, 255, 255, 0.15);
border-radius: 6px;
font-weight: 600;
}
.navbar-dark .navbar-text {
color: rgba(255, 255, 255, 0.8) !important;
}
"""
content = content.replace('</style>', blue_navbar_style + ' </style>')
# 只有內容改變時才寫入
if content != original_content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return True
return False
except Exception as e:
print(f"處理 {file_path} 時發生錯誤: {e}")
return False
def main():
"""主程序"""
base_dir = Path(__file__).parent
html_files = list(base_dir.glob("*.html"))
updated_count = 0
print(f"找到 {len(html_files)} 個 HTML 文件")
print("=" * 60)
for html_file in html_files:
if apply_blue_navbar(html_file):
print(f"✅ 已更新: {html_file.name}")
updated_count += 1
else:
print(f"⏭️ 跳過: {html_file.name}")
print("=" * 60)
print(f"✅ 完成!共更新 {updated_count} 個文件為藍色主題")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,93 @@
import os
import sys
import re
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
def check_image_url_match(i_code: str, image_url: str) -> bool:
"""檢查圖片 URL 是否包含對應的 i_code"""
if not image_url:
return False
# TP 開頭的商品,檢查完整 i_code
if i_code.startswith('TP'):
return i_code in image_url
# 數字商品,檢查多種格式
code_num = str(int(i_code))
# 格式1: /{i_code}_ 或 /{i_code}.
if f"/{code_num}_" in image_url or f"/{code_num}." in image_url:
return True
# 格式2: 分段格式 /0014/548/538/ for 14548538
code_str = code_num.zfill(8)
part3 = code_str[-3:]
part2 = code_str[-6:-3]
part1 = code_str[:-6].zfill(4)
path_pattern = f"/{part1}/{part2}/{part3}/"
if path_pattern in image_url:
return True
return False
print("🔍 檢查全部商品的圖片 URL...\n")
print("=" * 100)
# 獲取所有有圖片的商品
products = session.query(Product).filter(Product.image_url.isnot(None), Product.image_url != '').all()
total = len(products)
correct = 0
incorrect = 0
incorrect_list = []
print(f"總商品數(有圖片): {total}\n")
print("檢查中...")
for i, product in enumerate(products, 1):
if i % 500 == 0:
print(f" 進度: {i}/{total} ({i*100//total}%)")
if check_image_url_match(product.i_code, product.image_url):
correct += 1
else:
incorrect += 1
incorrect_list.append(product.i_code)
print(f"\n" + "=" * 100)
print(f"\n📊 檢查結果:")
print(f" ✅ 正確: {correct} ({correct*100//total}%)")
print(f" ❌ 錯誤: {incorrect} ({incorrect*100//total}%)")
print(f" 總計: {total}")
if incorrect > 0:
print(f"\n⚠️ 發現 {incorrect} 個商品的圖片 URL 不正確")
print(f"\n需要修復的商品 i_code 清單:")
# 輸出到文件
with open('incorrect_images.txt', 'w') as f:
for i_code in incorrect_list:
f.write(f"{i_code}\n")
print(f" 已保存到 incorrect_images.txt")
# 顯示前 50 個
print(f"\n前 50 個需要修復的商品:")
for i, i_code in enumerate(incorrect_list[:50], 1):
print(f" {i}. {i_code}")
if len(incorrect_list) > 50:
print(f" ... 還有 {len(incorrect_list) - 50}")
session.close()

View File

@@ -0,0 +1,33 @@
from app import app
from database.manager import DatabaseManager
from database.models import MonthlySummaryAnalysis
from sqlalchemy import func
db = DatabaseManager()
session = db.get_session()
print("Checking 2025 data...")
count = session.query(func.count(MonthlySummaryAnalysis.id)).filter(MonthlySummaryAnalysis.year == 2025).scalar()
print(f"Total rows for 2025: {count}")
print("Checking 2024 data...")
count_24 = session.query(func.count(MonthlySummaryAnalysis.id)).filter(MonthlySummaryAnalysis.year == 2024).scalar()
print(f"Total rows for 2024: {count_24}")
print("Checking Top 12 Query Logic...")
# Simulate div_dist_q logic manually
try:
q = session.query(
MonthlySummaryAnalysis.division,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales')
).group_by(MonthlySummaryAnalysis.division).order_by(func.sum(MonthlySummaryAnalysis.sales_amt_curr).desc()).limit(12)
# Apply NO filters first
print(f"Top 12 (No Filters) Count: {len(q.all())}")
# Apply Month filter (e.g. month=1)
q_month = q.filter(MonthlySummaryAnalysis.month == 1)
print(f"Top 12 (Month=1) Count: {len(q_month.all())}")
except Exception as e:
print(f"Query failed: {e}")

View File

@@ -0,0 +1,43 @@
import os
import sys
from sqlalchemy import inspect, text
# ================= 環境設定 =================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
try:
from database.manager import DatabaseManager
except ImportError as e:
print(f"❌ 無法導入模組: {e}")
print("請確認您已安裝所有套件 (pip install -r requirements.txt) 並在專案根目錄執行。")
sys.exit(1)
def check_data():
print("🔍 正在檢查資料庫狀態...")
try:
db = DatabaseManager()
engine = db.engine
inspector = inspect(engine)
tables = inspector.get_table_names()
print(f"📂 資料庫路徑: {engine.url}")
print(f"📑 資料表清單: {tables}")
print("-" * 30)
with engine.connect() as conn:
for table in tables:
try:
count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar()
print(f"{table:<25} : {count}")
except Exception as e:
print(f"{table:<25} : 讀取錯誤 ({e})")
print("-" * 30)
print("檢查完成。")
except Exception as e:
print(f"🚨 資料庫連線失敗: {e}")
if __name__ == "__main__":
check_data()

View File

@@ -0,0 +1,29 @@
import os
# 取得目前腳本所在目錄 (專案根目錄)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 預期的資料庫路徑
db_path = os.path.join(BASE_DIR, 'data', 'momo_database.db')
print("="*40)
print("🔍 資料庫路徑檢查")
print("="*40)
print(f"📂 專案根目錄: {BASE_DIR}")
print(f"💾 資料庫檔案: {db_path}")
if os.path.exists(db_path):
size = os.path.getsize(db_path)
print(f"✅ 檔案存在 | 大小: {size} bytes ({size/1024/1024:.2f} MB)")
# 檢查是否還有 WAL 暫存檔 (可能導致鎖定或 I/O Error)
wal_path = db_path + "-wal"
shm_path = db_path + "-shm"
if os.path.exists(wal_path):
print(f"⚠️ 發現 WAL 暫存檔: {wal_path} (可能導致鎖定)")
if os.path.exists(shm_path):
print(f"⚠️ 發現 SHM 暫存檔: {shm_path}")
else:
print("❌ 檔案不存在")
print("="*40)

View File

@@ -0,0 +1,44 @@
import logging
from sqlalchemy import func
from database.manager import DatabaseManager
from database.models import Product
# 設定日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def check_duplicate_products():
"""
檢查 products 表中是否有重複的 i_code
"""
db = DatabaseManager()
session = db.get_session()
try:
logging.info("🔍 開始檢查 products 資料庫中的重複 i_code...")
# 查詢重複的 i_code
# 使用 group_by 和 having 來找出出現次數大於 1 的 i_code
duplicates = session.query(Product.i_code, func.count(Product.i_code))\
.group_by(Product.i_code)\
.having(func.count(Product.i_code) > 1)\
.all()
if duplicates:
logging.warning(f"⚠️ 發現 {len(duplicates)} 組重複的 i_code:")
for i_code, count in duplicates:
logging.warning(f" - i_code: {i_code}, 重複次數: {count}")
# 列出重複項目的詳細資訊 (ID, 名稱, 分類)
products = session.query(Product).filter(Product.i_code == i_code).all()
for p in products:
logging.info(f" * ID: {p.id}, Name: {p.name}, Category: {p.category}")
else:
logging.info("✅ products 表中沒有發現重複的 i_code。")
except Exception as e:
logging.error(f"❌ 檢查過程中發生錯誤: {e}")
finally:
session.close()
if __name__ == "__main__":
check_duplicate_products()

View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
快速查詢郵件發送狀態
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import EmailSendLog
from datetime import datetime, timedelta
def check_email_status():
"""查詢最近的郵件發送狀態"""
db = VendorDatabaseManager()
session = db.get_session()
try:
# 查詢最近 7 天的發送記錄
since = datetime.now() - timedelta(days=7)
logs = session.query(EmailSendLog).filter(
EmailSendLog.created_at >= since
).order_by(EmailSendLog.created_at.desc()).all()
if not logs:
print("\n❌ 最近 7 天沒有郵件發送記錄")
print("\n💡 提示:")
print(" 1. 請先在網頁上選擇缺貨商品並點擊「按廠商發送」或「按商品發送」")
print(" 2. 或者執行測試腳本python3 test_email_send.py")
return
print("\n" + "=" * 80)
print(f"📧 最近 7 天郵件發送記錄 (共 {len(logs)} 筆)")
print("=" * 80)
# 統計
success_count = sum(1 for log in logs if log.status == 'sent')
failed_count = sum(1 for log in logs if log.status == 'failed')
pending_count = sum(1 for log in logs if log.status == 'pending')
print(f"\n📊 統計:")
print(f" ✅ 成功: {success_count}")
print(f" ❌ 失敗: {failed_count}")
print(f" ⏳ 待發送: {pending_count}")
if success_count > 0:
rate = (success_count / len(logs)) * 100
print(f" 📈 成功率: {rate:.1f}%")
print(f"\n📋 詳細記錄:")
print("-" * 80)
for log in logs[:20]: # 只顯示最近 20 筆
status_icon = "" if log.status == 'sent' else "" if log.status == 'failed' else ""
print(f"\n{status_icon} ID: {log.id} | 批次: {log.batch_id}")
print(f" 收件者: {log.recipient_email}")
print(f" 主旨: {log.subject}")
print(f" 商品數: {log.product_count}")
print(f" 狀態: {log.status}")
print(f" 時間: {log.sent_at or log.created_at}")
if log.error_message:
print(f" ⚠️ 錯誤: {log.error_message}")
if log.attachment_filename:
print(f" 📎 附件: {log.attachment_filename}")
if len(logs) > 20:
print(f"\n... 還有 {len(logs) - 20} 筆記錄(請使用網頁查看完整記錄)")
# 提示如何確認
print("\n" + "=" * 80)
print("💡 如何確認對方真的收到郵件?")
print("=" * 80)
print("1. 登入收件信箱檢查yingpin_chen@pchome.tw")
print("2. 檢查垃圾郵件資料夾(第一次發送可能被誤判)")
print("3. 查看網頁記錄http://localhost:5888/vendor-stockout/send-email")
print("\n⚠️ 注意:")
print(" - 狀態「成功」= Gmail SMTP 已接收郵件")
print(" - 但不保證對方信箱一定收到(可能被退信或進垃圾桶)")
print(" - 建議先發測試郵件到自己的信箱確認格式")
print("=" * 80)
except Exception as e:
print(f"\n❌ 查詢失敗: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
check_email_status()

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
檢查 Excel 檔案格式
"""
import pandas as pd
import sys
def check_excel_format(excel_path):
"""檢查 Excel 格式"""
print("=" * 80)
print(f"檢查 Excel 檔案格式")
print("=" * 80)
try:
# 讀取 Excel
df = pd.read_excel(excel_path)
print(f"\n📊 Excel 資訊:")
print(f" 總行數: {len(df)}")
print(f" 總欄位數: {len(df.columns)}")
print(f"\n📋 所有欄位名稱:")
for i, col in enumerate(df.columns, 1):
print(f" {i:2d}. {col}")
print(f"\n📄 第一行原始數據:")
print("-" * 80)
for col in df.columns:
value = df[col].iloc[0]
value_type = type(value).__name__
print(f" {col}: {value} (類型: {value_type})")
# 特別檢查日期相關欄位
print(f"\n📅 日期欄位詳細資訊:")
print("-" * 80)
date_columns = ['當前日期', '缺貨日期']
for col in date_columns:
if col in df.columns:
value = df[col].iloc[0]
print(f"\n 欄位: {col}")
print(f" 原始值: {value}")
print(f" 類型: {type(value)}")
print(f" 是否為 NaT: {pd.isna(value)}")
# 嘗試轉換
try:
converted = pd.to_datetime(value, errors='coerce')
print(f" 轉換後: {converted}")
if pd.notna(converted):
print(f" 轉換為日期: {converted.date()}")
except Exception as e:
print(f" 轉換失敗: {e}")
print("\n" + "=" * 80)
except Exception as e:
print(f"\n❌ 錯誤: {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("使用方式: python3 check_excel_format.py <Excel檔案路徑>")
print("\n請提供您的 Excel 檔案路徑")
sys.exit(1)
excel_path = sys.argv[1]
check_excel_format(excel_path)

View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
檢查 import_date 欄位的數據類型和值
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import VendorStockout
def check_import_date():
"""檢查 import_date 欄位"""
db = VendorDatabaseManager()
session = db.get_session()
try:
# 查詢最近 5 筆記錄
records = session.query(VendorStockout).order_by(
VendorStockout.id.desc()
).limit(5).all()
print("=" * 80)
print("檢查 import_date 欄位")
print("=" * 80)
for record in records:
print(f"\nID: {record.id}")
print(f" import_date 原始值: {record.import_date}")
print(f" import_date 類型: {type(record.import_date)}")
if record.import_date:
print(f" strftime 轉換: {record.import_date.strftime('%Y-%m-%d')}")
else:
print(f" ⚠️ import_date 是 None")
print(f" product_code: {record.product_code}")
print(f" product_name: {record.product_name}")
print("\n" + "=" * 80)
except Exception as e:
print(f"❌ 錯誤: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
check_import_date()

View File

@@ -0,0 +1,60 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
檢查匯入問題
"""
import pandas as pd
import sys
# 可能的路徑
possible_paths = [
'/Users/ogt/momo_pro_system/缺貨測試.xlsx',
'/Users/ogt/缺貨測試.xlsx',
'/Users/ogt/Downloads/缺貨測試.xlsx',
]
excel_path = None
for path in possible_paths:
try:
df = pd.read_excel(path)
excel_path = path
break
except:
continue
if not excel_path:
print("找不到 Excel 檔案")
sys.exit(1)
print("=" * 80)
print(f"Excel 路徑: {excel_path}")
print("=" * 80)
# 讀取 Excel
df = pd.read_excel(excel_path)
print(f"\n總行數: {len(df)}")
print(f"總欄位數: {len(df.columns)}")
print(f"\n所有欄位名稱(共 {len(df.columns)} 個):")
for i, col in enumerate(df.columns, 1):
print(f" {i:2d}. '{col}'")
print(f"\n第一行數據:")
print("-" * 80)
for col in df.columns:
value = df[col].iloc[0]
value_type = type(value).__name__
print(f" {col}: {value} (類型: {value_type})")
# 檢查關鍵欄位
print(f"\n檢查關鍵欄位:")
print("-" * 80)
key_fields = ['區ID', '區名稱', '借採轉', '缺貨日期', '缺貨天數', '商品可賣量', '庫存水位']
for field in key_fields:
if field in df.columns:
value = df[field].iloc[0]
print(f"{field}: {value} (類型: {type(value).__name__})")
else:
print(f"{field}: 欄位不存在")

View File

@@ -0,0 +1,109 @@
# cSpell:ignore momo
import os
import sys
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from database.edm_models import PromoProduct
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
def check_missing_images():
"""檢查各個資料表中缺少圖片的商品數量"""
print("🔍 正在檢查商品圖片狀態...\n")
if not os.path.exists(DB_PATH):
print(f"❌ 資料庫檔案不存在: {DB_PATH}")
return
try:
engine = create_engine(f"sqlite:///{DB_PATH}")
Session = sessionmaker(bind=engine)
session = Session()
# 1. 檢查一般商品 (Product)
print("=" * 60)
print("📦 一般商品 (Product)")
print("=" * 60)
total_products = session.query(func.count(Product.i_code)).scalar()
missing_images = session.query(func.count(Product.i_code)).filter(
(Product.image_url == None) | (Product.image_url == '')
).scalar()
has_images = total_products - missing_images
print(f"總商品數: {total_products:,}")
print(f"有圖片: {has_images:,} ({has_images/total_products*100:.1f}%)")
print(f"缺圖片: {missing_images:,} ({missing_images/total_products*100:.1f}%)")
# 顯示幾個缺圖的範例
if missing_images > 0:
print("\n📋 缺少圖片的商品範例 (前5筆):")
missing_products = session.query(Product).filter(
(Product.image_url == None) | (Product.image_url == '')
).limit(5).all()
for i, p in enumerate(missing_products, 1):
print(f"{i}. [{p.i_code}] {p.name[:40]}...")
# 2. 檢查促銷商品 (PromoProduct - EDM)
print("\n" + "=" * 60)
print("🎁 促銷商品 (EDM)")
print("=" * 60)
total_promo = session.query(func.count(PromoProduct.i_code)).filter(
PromoProduct.page_type == 'edm'
).scalar()
if total_promo > 0:
missing_promo = session.query(func.count(PromoProduct.i_code)).filter(
PromoProduct.page_type == 'edm',
(PromoProduct.image_url == None) | (PromoProduct.image_url == '')
).scalar()
has_promo = total_promo - missing_promo
print(f"總商品數: {total_promo:,}")
print(f"有圖片: {has_promo:,} ({has_promo/total_promo*100:.1f}%)")
print(f"缺圖片: {missing_promo:,} ({missing_promo/total_promo*100:.1f}%)")
else:
print("目前沒有促銷商品資料")
# 3. 檢查購物節商品 (從 PromoProduct 中篩選 page_type='festival')
print("\n" + "=" * 60)
print("🎉 購物節商品 (Festival)")
print("=" * 60)
total_festival = session.query(func.count(PromoProduct.i_code)).filter(
PromoProduct.page_type == 'festival'
).scalar()
if total_festival > 0:
missing_festival = session.query(func.count(PromoProduct.i_code)).filter(
PromoProduct.page_type == 'festival',
(PromoProduct.image_url == None) | (PromoProduct.image_url == '')
).scalar()
has_festival = total_festival - missing_festival
print(f"總商品數: {total_festival:,}")
print(f"有圖片: {has_festival:,} ({has_festival/total_festival*100:.1f}%)")
print(f"缺圖片: {missing_festival:,} ({missing_festival/total_festival*100:.1f}%)")
else:
print("目前沒有購物節商品資料")
print("\n" + "=" * 60)
print("✅ 檢查完成")
print("=" * 60)
session.close()
except Exception as e:
print(f"❌ 檢查失敗: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
check_missing_images()

View File

@@ -0,0 +1,53 @@
import pandas as pd # type: ignore
import os
import sys
# 設定專案路徑以導入模組
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.manager import DatabaseManager
def check_monthly_stats():
print("📊 正在分析資料庫中的月度業績數據...")
db = DatabaseManager()
try:
# 1. 取得資料表欄位名稱
df_head = pd.read_sql("SELECT * FROM realtime_sales_monthly LIMIT 1", db.engine)
cols = df_head.columns.tolist()
# 自動尋找日期欄位
date_col = next((c for c in cols if '日期' in c or 'Date' in c), None)
if not date_col:
print(f"❌ 找不到日期欄位。現有欄位: {cols}")
return
print(f"📅 偵測到日期欄位: {date_col}")
# 2. 讀取日期資料 (只讀取日期欄位以節省記憶體)
df = pd.read_sql(f"SELECT \"{date_col}\" FROM realtime_sales_monthly", db.engine)
if df.empty:
print("⚠️ 資料表為空。")
return
# 3. 轉換與統計
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
monthly_counts = df.groupby(df[date_col].dt.to_period('M')).size()
print("\n" + "="*30)
print("🗓️ 各月份資料筆數統計")
print("="*30)
if monthly_counts.empty:
print("無法解析日期或無資料。")
else:
for period, count in monthly_counts.items():
print(f"{period}: {count:6d}")
print("="*30)
print(f"總計: {len(df)}")
except Exception as e:
print(f"❌ 讀取或分析失敗: {e}")
if __name__ == "__main__":
check_monthly_stats()

View File

@@ -0,0 +1,41 @@
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
# 指定的商品編號
i_codes = ['14672839', '9216349', '13261678', '6676895', '13240823', '13240822', '10911080', '3876337', '8863611']
print("🔍 檢查指定商品的圖片 URL:\n")
print("=" * 100)
for i_code in i_codes:
product = session.query(Product).filter(Product.i_code == i_code).first()
if product:
print(f"\n[{i_code}] {product.name[:60]}...")
print(f" 當前圖片: {product.image_url}")
# 檢查 URL 是否包含 i_code
if product.image_url and i_code in product.image_url:
print(f" ✅ URL 包含商品編號")
elif product.image_url:
print(f" ❌ URL 不包含商品編號")
else:
print(f" ⚠️ 無圖片")
else:
print(f"\n[{i_code}] ⚠️ 找不到此商品")
print("\n" + "=" * 100)
session.close()

View File

@@ -0,0 +1,62 @@
import logging
from sqlalchemy import desc
from database.manager import DatabaseManager
from database.edm_models import PromoProduct
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def cleanup_duplicate_promo_products():
"""
清理 promo_products 表,為每個 i_code 只保留最新的一筆紀錄。
"""
db = DatabaseManager()
session = db.get_session()
try:
logging.info("🧹 開始清理限時搶購 (promo_products) 資料庫中的重複數據...")
# 1. 找出所有唯一的 i_code
unique_icodes_query = session.query(PromoProduct.i_code).distinct()
unique_icodes = [item[0] for item in unique_icodes_query.all()]
total_unique_items = len(unique_icodes)
logging.info(f"🔍 發現 {total_unique_items} 個不重複的商品 (i_code)。")
deleted_count = 0
processed_count = 0
# 2. 遍歷每個 i_code
for i_code in unique_icodes:
# 找出該 i_code 的所有紀錄,按 ID 降序排列 (ID 越大代表越新)
records = session.query(PromoProduct).filter(PromoProduct.i_code == i_code).order_by(desc(PromoProduct.id)).all()
if len(records) > 1:
# 保留最新的一筆 (records[0]),刪除其餘的 (records[1:])
records_to_delete = records[1:]
num_to_delete = len(records_to_delete)
for record in records_to_delete:
session.delete(record)
deleted_count += num_to_delete
processed_count += 1
if processed_count % 100 == 0:
logging.info(f"🔄 已處理 {processed_count}/{total_unique_items} 個商品...")
# 3. 提交變更
if deleted_count > 0:
logging.info(f"⏳ 正在提交資料庫變更,準備刪除 {deleted_count} 筆舊紀錄...")
session.commit()
logging.info(f"✅ 清理完成!總共刪除了 {deleted_count} 筆重複的舊資料。")
else:
logging.info("✅ 資料庫很乾淨,無需清理。")
except Exception as e:
logging.error(f"❌ 清理過程中發生錯誤: {e}")
session.rollback()
finally:
session.close()
if __name__ == "__main__":
cleanup_duplicate_promo_products()

View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""清除 daily_sales_snapshot 資料表的所有資料"""
from database.manager import DatabaseManager
from sqlalchemy import text
if __name__ == '__main__':
db = DatabaseManager()
engine = db.engine
try:
with engine.connect() as conn:
result = conn.execute(text('DELETE FROM daily_sales_snapshot'))
conn.commit()
print(f'✅ 已清除 daily_sales_snapshot 資料表中的所有資料')
print(f'📊 刪除了 {result.rowcount} 筆記錄')
except Exception as e:
print(f'❌ 清除失敗: {e}')

View File

@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
import os
import sys
from sqlalchemy import inspect, text
# 將專案根目錄加入 sys.path 以便讀取模組
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
try:
from database.manager import DatabaseManager
except ImportError:
print("❌ 無法導入 DatabaseManager請確認您在專案根目錄下執行此腳本。")
sys.exit(1)
def clear_table():
"""清空指定的資料表"""
table_name = 'realtime_sales_monthly'
# V-Fix: 直接使用當前腳本路徑計算資料庫位置,確保與 app.py 一致 (修正路徑問題)
db_path = os.path.join(BASE_DIR, 'data', 'momo_database.db')
print(f"🚀 準備清空資料表: {table_name}")
print(f"📂 目標資料庫路徑: {db_path}")
db = DatabaseManager(db_path=db_path)
engine = db.engine
try:
inspector = inspect(engine)
if not inspector.has_table(table_name):
print(f" 資料表 '{table_name}' 不存在,無需清空。")
return
with engine.connect() as conn:
# 1. 查詢目前筆數
count = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")).scalar()
conn.commit() # 提交隱式交易,避免鎖定
if count == 0:
print("⚠️ 資料表已空,無需執行。")
return
print(f"📊 目前共有 {count} 筆資料。")
# 2. 確認刪除
confirm = input("⚠️ 警告:確定要【永久刪除 (DROP TABLE)】所有資料嗎?(y/n): ")
if confirm.lower() == 'y':
print("⏳ 正在執行刪除...")
# V-Fix: 安全修復邏輯
try:
# 1. 嘗試強制寫入 WAL 暫存檔,解決鎖定問題
conn.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
# 2. 僅刪除指定的業績報表
with conn.begin():
conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
# 3. 重整資料庫檔案 (釋放空間並修復可能的頁面錯誤)
conn.execute(text("VACUUM"))
print(f"✅ 成功刪除資料表 '{table_name}' 並完成資料庫重整。")
print("✅ 其他資料 (商品、價格紀錄) 均已保留。")
except Exception as db_err:
print(f"❌ 資料庫操作失敗: {db_err}")
print("👉 請務必先【關閉 app.py】再執行此腳本")
return
print("==========================================")
print("⚠️ 重要提示:請務必【重新啟動 app.py】以清除網頁快取否則網頁上可能仍會顯示舊資料")
print("==========================================")
else:
print("已取消操作。")
except Exception as e:
print(f"❌ 發生錯誤: {e}")
if __name__ == "__main__":
clear_table()

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
清除缺貨資料表
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import VendorStockout
def clear_stockout_data():
"""清除所有缺貨資料"""
db = VendorDatabaseManager()
session = db.get_session()
try:
# 統計資料數量
count = session.query(VendorStockout).count()
print("=" * 80)
print(f"清除缺貨資料")
print("=" * 80)
print(f"\n目前資料庫中有 {count} 筆缺貨資料")
print(f"\n⚠️ 即將清除所有資料!")
# 刪除所有記錄
session.query(VendorStockout).delete()
session.commit()
print(f"\n✅ 已清除 {count} 筆資料")
print(f"\n現在可以重新匯入正確的 Excel 資料了")
print("=" * 80)
except Exception as e:
session.rollback()
print(f"\n❌ 錯誤: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
clear_stockout_data()

View File

@@ -0,0 +1,51 @@
import os
import subprocess
import base64
ASSETS = [
("/Users/ogt/momo_pro_system/web/static/images/logo.png", "WOOO_Main_Logo"),
("/Users/ogt/momo_pro_system/web/static/images/logo_v4_glass.png", "WOOO_Glass_Logo"),
("/Users/ogt/momo_pro_system/web/static/images/logo_v4_gradient.png", "WOOO_Gradient_Logo"),
]
OUTPUT_DIR = "/Users/ogt/momo_pro_system/export_assets"
os.makedirs(OUTPUT_DIR, exist_ok=True)
for input_path, base_name in ASSETS:
print(f"Processing {base_name}...")
try:
# TIFF (Keep transparency)
subprocess.run(["convert", input_path, "-background", "none", os.path.join(OUTPUT_DIR, f"{base_name}.tiff")], check=True)
# JPG (White background, flattened)
subprocess.run(["convert", input_path, "-background", "white", "-flatten", os.path.join(OUTPUT_DIR, f"{base_name}.jpg")], check=True)
# EPS (Raster embedded)
subprocess.run(["convert", input_path, "-background", "none", os.path.join(OUTPUT_DIR, f"{base_name}.eps")], check=True)
# AI (PDF renamed to AI for compatibility)
pdf_path = os.path.join(OUTPUT_DIR, f"{base_name}.pdf")
ai_path = os.path.join(OUTPUT_DIR, f"{base_name}.ai")
subprocess.run(["convert", input_path, "-background", "none", pdf_path], check=True)
subprocess.run(["cp", pdf_path, ai_path], check=True)
# SVG (Embed base64)
with open(input_path, "rb") as img_file:
b64_data = base64.b64encode(img_file.read()).decode("utf-8")
# Get dimensions using identify
dim_res = subprocess.check_output(["identify", "-format", "%w %h", input_path]).decode().strip().split()
if len(dim_res) >= 2:
w, h = dim_res[0], dim_res[1]
svg_content = f'''<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg width="{w}" height="{h}" viewBox="0 0 {w} {h}" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<image width="{w}" height="{h}" xlink:href="data:image/png;base64,{b64_data}"/>
</svg>'''
with open(os.path.join(OUTPUT_DIR, f"{base_name}.svg"), "w") as svg_file:
svg_file.write(svg_content)
print(f"Successfully converted {base_name}")
except Exception as e:
print(f"Error processing {base_name}: {e}")
print("All conversions completed.")

View File

@@ -0,0 +1,43 @@
from app import app, MonthlySummaryAnalysis
from database.manager import DatabaseManager
from sqlalchemy import func, case, desc
# Mocking the filter logic since it's an inner function
def apply_filters_debug(q, year=None, month=None, ignore_year=False):
print(f"Applying filters: year={year}, ignore_year={ignore_year}")
if year and not ignore_year:
print(" -> Filtering by year")
q = q.filter(MonthlySummaryAnalysis.year == year)
if month:
q = q.filter(MonthlySummaryAnalysis.month == month)
return q
db_manager = DatabaseManager()
session = db_manager.get_session()
year = 2025
month = None # Simulating the curl request
print("--- Debugging Div Dist Query ---")
div_dist_q = session.query(
MonthlySummaryAnalysis.division,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales'),
func.sum(case((MonthlySummaryAnalysis.year == 2024, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2024'),
func.sum(case((MonthlySummaryAnalysis.year == 2025, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2025')
).group_by(MonthlySummaryAnalysis.division).order_by(desc('sales')).limit(12)
div_dist_q = apply_filters_debug(div_dist_q, year=year, ignore_year=True)
print("SQL:", div_dist_q.statement.compile(compile_kwargs={"literal_binds": True}))
results = div_dist_q.all()
print(f"Results Count: {len(results)}")
if results:
print("First result:", results[0])
print("\n--- Debugging Regular Filter (Comparison) ---")
# What happens if we DO filter by year?
div_dist_q_filtered = session.query(
MonthlySummaryAnalysis.division,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales')
).group_by(MonthlySummaryAnalysis.division).order_by(desc('sales')).limit(12)
div_dist_q_filtered = apply_filters_debug(div_dist_q_filtered, year=year, ignore_year=False)
print(f"Filtered Results Count: {len(div_dist_q_filtered.all())}")

View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
郵件問題診斷工具
檢查 SMTP 設定並提供修正建議
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import config
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def diagnose_smtp_config():
"""診斷 SMTP 設定"""
print("=" * 80)
print("📧 郵件設定診斷工具")
print("=" * 80)
print("\n1⃣ 當前 SMTP 設定:")
print("-" * 80)
print(f" SMTP 伺服器: {config.EMAIL_HOST}")
print(f" SMTP 埠號: {config.EMAIL_PORT}")
print(f" 寄件者: {config.EMAIL_SENDER}")
print(f" 登入帳號: {config.EMAIL_HOST_USER}")
print(f" 密碼: {'*' * len(config.EMAIL_HOST_PASSWORD)}")
# 判斷設定是否合理
print("\n2⃣ 設定分析:")
print("-" * 80)
is_pchome_email = '@pchome.tw' in config.EMAIL_HOST_USER
is_gmail_smtp = 'gmail.com' in config.EMAIL_HOST
if is_pchome_email and is_gmail_smtp:
print(" ❌ 錯誤:使用 PChome 郵件地址但設定 Gmail SMTP 伺服器")
print(" ⚠️ 這會導致認證失敗,郵件無法發送")
print("\n 💡 解決方案:")
print(" 請向 PChome IT 部門詢問以下資訊:")
print(" - SMTP 伺服器地址例如smtp.pchome.tw 或 mail.pchome.tw")
print(" - SMTP 埠號(通常是 587 或 465")
print(" - 是否需要使用 TLS/SSL")
print(" - 認證方式(帳號密碼或其他)")
return False
elif is_pchome_email:
print(" ⚠️ 使用 PChome 郵件,但 SMTP 伺服器不是 Gmail")
print(f" 當前 SMTP: {config.EMAIL_HOST}")
print(" 請確認此伺服器是否正確")
elif is_gmail_smtp:
print(" 使用 Gmail SMTP 伺服器")
if '@gmail.com' not in config.EMAIL_HOST_USER:
print(" ⚠️ 但寄件者不是 Gmail 地址,可能會有問題")
print("\n3⃣ 測試 SMTP 連線:")
print("-" * 80)
try:
print(" 正在連接 SMTP 伺服器...")
server = smtplib.SMTP(config.EMAIL_HOST, config.EMAIL_PORT, timeout=10)
print(" ✅ 成功連接到 SMTP 伺服器")
print(" 正在啟動 TLS 加密...")
server.starttls()
print(" ✅ TLS 加密啟動成功")
print(" 正在進行身份認證...")
server.login(config.EMAIL_HOST_USER, config.EMAIL_HOST_PASSWORD)
print(" ✅ 身份認證成功")
server.quit()
print("\n 🎉 SMTP 設定完全正確!")
return True
except smtplib.SMTPAuthenticationError as e:
print(f" ❌ 認證失敗: {e}")
print("\n 可能原因:")
print(" 1. 帳號或密碼錯誤")
print(" 2. 使用 PChome 郵件但 SMTP 伺服器設定為 Gmail")
print(" 3. 需要啟用「允許低安全性應用程式」Gmail")
print(" 4. 需要使用應用程式專用密碼Gmail")
return False
except smtplib.SMTPConnectError as e:
print(f" ❌ 連線失敗: {e}")
print("\n 可能原因:")
print(" 1. SMTP 伺服器地址錯誤")
print(" 2. 網路連線問題")
print(" 3. 防火牆封鎖")
return False
except Exception as e:
print(f" ❌ 未知錯誤: {e}")
return False
def test_send_email():
"""測試發送郵件"""
print("\n4⃣ 測試發送郵件:")
print("-" * 80)
try:
# 建立測試郵件
msg = MIMEMultipart('alternative')
msg['From'] = config.EMAIL_SENDER
msg['To'] = config.EMAIL_SENDER # 發給自己
msg['Subject'] = '【測試】MOMO 系統郵件測試'
# 純文字版本
text_body = """
這是一封測試郵件。
如果您收到這封郵件,表示 SMTP 設定正確。
此致
MOMO 監控系統
"""
# HTML 版本
html_body = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body style="font-family: 'Microsoft JhengHei', Arial, sans-serif; padding: 20px;">
<h2 style="color: #2c3e50;">✅ 測試郵件</h2>
<p>這是一封測試郵件。</p>
<p>如果您收到這封郵件,表示 SMTP 設定正確。</p>
<hr>
<p style="color: #7f8c8d; font-size: 12px;">此致<br>MOMO 監控系統</p>
</body>
</html>
"""
text_part = MIMEText(text_body, 'plain', 'utf-8')
html_part = MIMEText(html_body, 'html', 'utf-8')
msg.attach(text_part)
msg.attach(html_part)
# 發送
print(" 正在發送測試郵件...")
with smtplib.SMTP(config.EMAIL_HOST, config.EMAIL_PORT) as server:
server.starttls()
server.login(config.EMAIL_HOST_USER, config.EMAIL_HOST_PASSWORD)
server.sendmail(config.EMAIL_SENDER, [config.EMAIL_SENDER], msg.as_string())
print(" ✅ 測試郵件發送成功!")
print(f"\n 📬 請檢查信箱: {config.EMAIL_SENDER}")
print(" ⚠️ 如果沒收到,請檢查垃圾郵件資料夾")
return True
except Exception as e:
print(f" ❌ 發送失敗: {e}")
return False
def main():
"""主程式"""
# 診斷 SMTP 設定
smtp_ok = diagnose_smtp_config()
if smtp_ok:
# 如果 SMTP 設定正確,測試發送郵件
test_send_email()
else:
print("\n" + "=" * 80)
print("❌ SMTP 設定有問題,無法發送郵件")
print("=" * 80)
print("\n建議處理步驟:")
print("1. 確認您要使用的郵件系統PChome 或 Gmail")
print("2. 取得正確的 SMTP 伺服器設定")
print("3. 更新 .env 檔案中的設定")
print("4. 重新執行此診斷工具")
print("\n" + "=" * 80)
print("診斷完成")
print("=" * 80)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
直接讀取 Excel 檔案,查看實際內容
"""
import pandas as pd
import sys
import os
def read_excel_directly():
"""直接讀取 Excel"""
# 尋找 Excel 檔案
possible_paths = [
'/Users/ogt/momo_pro_system/缺貨測試.xlsx',
'/Users/ogt/缺貨測試.xlsx',
'/Users/ogt/Downloads/缺貨測試.xlsx',
]
excel_path = None
for path in possible_paths:
if os.path.exists(path):
excel_path = path
break
if not excel_path:
print("找不到 Excel 檔案,請提供完整路徑")
print("使用方式: python3 direct_read_excel.py <Excel檔案路徑>")
return
print("=" * 80)
print(f"讀取 Excel: {excel_path}")
print("=" * 80)
try:
# 讀取 Excel
df = pd.read_excel(excel_path)
print(f"\n📊 資料概況:")
print(f" 總行數: {len(df)}")
print(f" 總欄位數: {len(df.columns)}")
print(f"\n📋 所有欄位名稱:")
for i, col in enumerate(df.columns, 1):
print(f" {i:2d}. {col}")
print(f"\n📄 第一行完整數據:")
print("=" * 80)
for col in df.columns:
value = df[col].iloc[0]
value_type = type(value).__name__
print(f"{col:20s}: {value} (類型: {value_type})")
print("\n" + "=" * 80)
# 檢查是否有18個欄位
if len(df.columns) == 18:
print("✅ 欄位數量正確18 個")
else:
print(f"⚠️ 欄位數量不對:{len(df.columns)} 個(預期 18 個)")
except Exception as e:
print(f"\n❌ 錯誤: {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
if len(sys.argv) > 1:
# 使用提供的路徑
excel_path = sys.argv[1]
if os.path.exists(excel_path):
print("=" * 80)
print(f"讀取 Excel: {excel_path}")
print("=" * 80)
df = pd.read_excel(excel_path)
print(f"\n總行數: {len(df)}, 總欄位數: {len(df.columns)}")
print(f"\n欄位清單:")
for i, col in enumerate(df.columns, 1):
print(f" {i}. {col}")
print(f"\n第一行數據:")
for col in df.columns:
print(f" {col}: {df[col].iloc[0]}")
else:
print(f"檔案不存在: {excel_path}")
else:
read_excel_directly()

View File

@@ -0,0 +1,65 @@
import os
import sys
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from utils.image_url_builder import get_product_image_url
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
# 讀取需要修復的商品清單
with open('incorrect_images.txt', 'r') as f:
i_codes = [line.strip() for line in f if line.strip()]
total = len(i_codes)
print(f"🔧 開始修復 {total} 個商品的圖片 URL\n")
print("=" * 100)
success_count = 0
fail_count = 0
for i, i_code in enumerate(i_codes, 1):
product = session.query(Product).filter(Product.i_code == i_code).first()
if product:
if i % 50 == 0 or i <= 10:
print(f"\n[{i}/{total}] 處理: [{i_code}] {product.name[:45]}...")
if product.image_url:
print(f" ❌ 舊: {product.image_url[:70]}...")
# 獲取新的圖片 URL
new_image_url = get_product_image_url(i_code)
if new_image_url:
product.image_url = new_image_url
session.commit()
if i % 50 == 0 or i <= 10:
print(f" ✅ 新: {new_image_url}")
success_count += 1
else:
if i % 50 == 0 or i <= 10:
print(f" ⚠️ 無法獲取")
fail_count += 1
# 每 10 個顯示進度
if i % 10 == 0:
progress = i * 100 // total
print(f" 進度: {i}/{total} ({progress}%) - 成功: {success_count}, 失敗: {fail_count}")
# 延遲避免請求過快
time.sleep(0.3)
else:
fail_count += 1
print("\n" + "=" * 100)
print(f"\n📊 修復結果:")
print(f" ✅ 成功: {success_count}/{total} ({success_count*100//total}%)")
print(f" ❌ 失敗: {fail_count}/{total} ({fail_count*100//total}%)")
session.close()

View File

@@ -0,0 +1,70 @@
import os
import sys
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from utils.image_url_builder import get_product_image_url
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
# 指定的商品編號
i_codes = ['5939587', '14713987', '14237189', '13759520', '13330513', '13731019',
'14049583', '14695038', '14373672', '13646926', '4633943', '6271461', '13559833']
print("🔧 修復第二批商品的圖片 URL:\n")
print("=" * 100)
success_count = 0
fail_count = 0
for i, i_code in enumerate(i_codes, 1):
product = session.query(Product).filter(Product.i_code == i_code).first()
if product:
print(f"\n[{i}/{len(i_codes)}] 處理: [{i_code}] {product.name[:50]}...")
# 檢查當前圖片是否正確
if product.image_url and i_code in product.image_url:
print(f" ⏭️ 圖片已正確: {product.image_url}")
success_count += 1
else:
if product.image_url:
print(f" ❌ 舊圖片: {product.image_url[:80]}...")
else:
print(f" ⚠️ 無圖片")
# 獲取新的圖片 URL
new_image_url = get_product_image_url(i_code)
if new_image_url:
product.image_url = new_image_url
session.commit()
print(f" ✅ 更新: {new_image_url}")
success_count += 1
else:
print(f" ⚠️ 無法獲取圖片(可能已下架或不存在)")
fail_count += 1
# 延遲避免請求過快
if i < len(i_codes):
time.sleep(0.5)
else:
print(f"\n[{i}/{len(i_codes)}] ⚠️ 找不到商品: [{i_code}]")
fail_count += 1
print("\n" + "=" * 100)
print(f"\n📊 修復結果:")
print(f" ✅ 成功: {success_count}")
print(f" ❌ 失敗: {fail_count}")
print(f" 總計: {len(i_codes)}")
session.close()

View File

@@ -0,0 +1,73 @@
import os
import sys
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from utils.image_url_builder import get_product_image_url
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
# 第三批商品編號
i_codes = ['6323590', '7904362', '5954422', '6009343', '4248163', '3331634', '7351198',
'12589232', '11118441', '10755887', '9009264', '9562474', '11038732', '10901361',
'9250857', '5477268', '9600967', '10440331', '10080994', '9900915', '3821684',
'10567236', '13351736', '13351734', '12777975', '11640264', '5894654', '10050962',
'8318051', '11593427', '9857434']
print(f"🔧 修復第三批商品的圖片 URL (共 {len(i_codes)} 個):\n")
print("=" * 100)
success_count = 0
fail_count = 0
skip_count = 0
for i, i_code in enumerate(i_codes, 1):
product = session.query(Product).filter(Product.i_code == i_code).first()
if product:
print(f"\n[{i}/{len(i_codes)}] 處理: [{i_code}] {product.name[:45]}...")
# 檢查當前圖片是否正確
if product.image_url and i_code in product.image_url:
print(f" ⏭️ 圖片已正確")
skip_count += 1
else:
if product.image_url:
print(f" ❌ 舊圖片: {product.image_url[:70]}...")
else:
print(f" ⚠️ 無圖片")
# 獲取新的圖片 URL
new_image_url = get_product_image_url(i_code)
if new_image_url:
product.image_url = new_image_url
session.commit()
print(f" ✅ 更新成功")
success_count += 1
else:
print(f" ⚠️ 無法獲取(可能已下架)")
fail_count += 1
# 延遲避免請求過快
if i < len(i_codes):
time.sleep(0.3)
else:
print(f"\n[{i}/{len(i_codes)}] ⚠️ 找不到商品: [{i_code}]")
fail_count += 1
print("\n" + "=" * 100)
print(f"\n📊 修復結果:")
print(f" ✅ 成功更新: {success_count}")
print(f" ⏭️ 已正確跳過: {skip_count}")
print(f" ❌ 失敗: {fail_count}")
print(f" 總計: {len(i_codes)}")
session.close()

View File

@@ -0,0 +1,20 @@
from database.manager import DatabaseManager
from database.edm_models import PromoProduct
def fix_edm_status():
db = DatabaseManager()
session = db.get_session()
try:
print("🛠️ 正在修正 EDM 商品狀態...")
# 將所有目前標記為 DELISTED 的舊資料改為 SLOT_END讓它們從儀表板消失
# 這樣下一次爬蟲會重新建立正確的狀態
session.query(PromoProduct).filter(PromoProduct.status_change == 'DELISTED').update({PromoProduct.status_change: 'SLOT_END'})
session.commit()
print("✅ 修正完成!請重新執行一次 EDM 爬蟲任務以獲取最新狀態。")
except Exception as e:
print(f"❌ 錯誤: {e}")
finally:
session.close()
if __name__ == "__main__":
fix_edm_status()

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修正資料庫中的 import_date 欄位
將所有 1970-01-01 的日期更新為今天
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import VendorStockout
from datetime import date
def fix_import_dates():
"""修正 import_date 欄位"""
db = VendorDatabaseManager()
session = db.get_session()
try:
# 查詢所有 import_date 為 1970-01-01 的記錄
bad_date = date(1970, 1, 1)
records = session.query(VendorStockout).filter(
VendorStockout.import_date == bad_date
).all()
print("=" * 80)
print(f"修正 import_date 欄位")
print("=" * 80)
if not records:
print("\n✅ 沒有需要修正的記錄(所有日期都正確)")
return
print(f"\n找到 {len(records)} 筆需要修正的記錄")
print(f"將日期從 1970-01-01 更新為今天: {date.today()}")
print("\n確定要更新嗎?(y/n): ", end='')
response = input().strip().lower()
if response != 'y':
print("❌ 已取消")
return
# 更新記錄
today = date.today()
updated_count = 0
for record in records:
record.import_date = today
updated_count += 1
if updated_count % 10 == 0:
print(f" 已更新 {updated_count}/{len(records)} 筆...")
# 提交變更
session.commit()
print(f"\n✅ 成功更新 {updated_count} 筆記錄")
print(f" 新日期: {today}")
print("\n" + "=" * 80)
print("修正完成!")
print("=" * 80)
except Exception as e:
session.rollback()
print(f"\n❌ 錯誤: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
fix_import_dates()

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自動修正資料庫中的 import_date 欄位
將所有 1970-01-01 的日期更新為今天
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import VendorStockout
from datetime import date
def fix_import_dates():
"""修正 import_date 欄位"""
db = VendorDatabaseManager()
session = db.get_session()
try:
# 查詢所有 import_date 為 1970-01-01 的記錄
bad_date = date(1970, 1, 1)
records = session.query(VendorStockout).filter(
VendorStockout.import_date == bad_date
).all()
print("=" * 80)
print(f"自動修正 import_date 欄位")
print("=" * 80)
if not records:
print("\n✅ 沒有需要修正的記錄(所有日期都正確)")
return
print(f"\n找到 {len(records)} 筆需要修正的記錄")
print(f"將日期從 1970-01-01 更新為今天: {date.today()}")
# 更新記錄
today = date.today()
updated_count = 0
for record in records:
record.import_date = today
updated_count += 1
if updated_count % 10 == 0:
print(f" 已更新 {updated_count}/{len(records)} 筆...")
# 提交變更
session.commit()
print(f"\n✅ 成功更新 {updated_count} 筆記錄")
print(f" 新日期: {today}")
# 驗證更新
print("\n驗證更新結果:")
sample_records = session.query(VendorStockout).limit(3).all()
for r in sample_records:
print(f" ID {r.id}: {r.import_date} - {r.product_name[:30]}...")
print("\n" + "=" * 80)
print("✅ 修正完成!現在 Excel 中的日期會正確顯示了")
print("=" * 80)
except Exception as e:
session.rollback()
print(f"\n❌ 錯誤: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
fix_import_dates()

View File

@@ -0,0 +1,115 @@
#!/usr/bin/env python3
"""
修復所有 HTML 文件的導航列,移除重複項目
"""
import os
import re
from pathlib import Path
# 標準導航列模板
STANDARD_NAVBAR = ''' <div class="collapse navbar-collapse" id="navbarNav">
<ul class="navbar-nav me-auto">
<li class="nav-item"><a class="nav-link{active_dashboard}" href="/"><i class="fas fa-chart-line me-1"></i>商品看板</a></li>
<li class="nav-item"><a class="nav-link{active_edm}" href="/edm"><i class="fas fa-bullhorn me-1"></i>活動看板</a></li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle{active_analysis}" href="#" role="button" data-bs-toggle="dropdown">
<i class="fas fa-chart-bar me-1"></i>分析報表
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/sales_analysis"><i class="fas fa-chart-bar me-2"></i>業績分析</a></li>
<li><a class="dropdown-item" href="/daily_sales"><i class="fas fa-calendar-day me-2"></i>當日業績</a></li>
</ul>
</li>
<li class="nav-item"><a class="nav-link{active_vendor}" href="/vendor-stockout"><i class="fas fa-box-open me-1"></i>廠商缺貨</a></li>
<li class="nav-item"><a class="nav-link{active_import}" href="/auto_import"><i class="fas fa-cloud-download-alt me-1"></i>雲端匯入</a></li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle{active_system}" href="#" role="button" data-bs-toggle="dropdown">
<i class="fas fa-cog me-1"></i>系統管理
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/settings"><i class="fas fa-robot me-2"></i>爬蟲管理</a></li>
<li><a class="dropdown-item" href="/system_settings"><i class="fas fa-sliders-h me-2"></i>系統設定</a></li>
<li><a class="dropdown-item" href="/logs"><i class="fas fa-file-alt me-2"></i>系統日誌</a></li>
</ul>
</li>
</ul>
<span class="navbar-text text-light small">
<i class="fas fa-clock me-1"></i>{{ datetime_now }}
</span>
</div>'''
# 頁面 active 標記映射
PAGE_ACTIVE_MAP = {
'dashboard.html': {'active_dashboard': ' active'},
'edm_dashboard.html': {'active_edm': ' {% if current_promo_page in [\'edm\', \'festival\'] %}active{% endif %}'},
'sales_analysis.html': {'active_analysis': ' active'},
'daily_sales.html': {'active_analysis': ' active'},
'vendor_stockout_index.html': {'active_vendor': ' active'},
'vendor_stockout_list.html': {'active_vendor': ' active'},
'vendor_stockout_import.html': {'active_vendor': ' active'},
'vendor_stockout_send_email.html': {'active_vendor': ' active'},
'vendor_stockout_history.html': {'active_vendor': ' active'},
'auto_import_index.html': {'active_import': ' active'},
'settings.html': {'active_system': ' active'},
'system_settings.html': {'active_system': ' active'},
'logs.html': {'active_system': ' active'},
'growth_analysis.html': {},
}
def fix_navbar(file_path):
"""修復單個 HTML 文件的導航列"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 檢查是否有導航列
if '<div class="collapse navbar-collapse" id="navbarNav">' not in content:
print(f"⏭️ 跳過 {file_path.name}(沒有導航列)")
return False
# 準備導航列內容
filename = file_path.name
active_map = PAGE_ACTIVE_MAP.get(filename, {})
navbar = STANDARD_NAVBAR
navbar = navbar.replace('{active_dashboard}', active_map.get('active_dashboard', ''))
navbar = navbar.replace('{active_edm}', active_map.get('active_edm', ''))
navbar = navbar.replace('{active_analysis}', active_map.get('active_analysis', ''))
navbar = navbar.replace('{active_vendor}', active_map.get('active_vendor', ''))
navbar = navbar.replace('{active_import}', active_map.get('active_import', ''))
navbar = navbar.replace('{active_system}', active_map.get('active_system', ''))
# 使用正則替換整個 navbar-collapse div
pattern = r'<div class="collapse navbar-collapse" id="navbarNav">.*?</div>\s*</div>\s*</nav>'
replacement = navbar + '\n </div>\n </nav>'
new_content = re.sub(pattern, replacement, content, flags=re.DOTALL)
if new_content == content:
print(f"⚠️ {file_path.name} 替換失敗")
return False
# 寫回文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
print(f"✅ 已修復 {file_path.name}")
return True
def main():
"""主程序"""
base_dir = Path(__file__).parent
html_files = list(base_dir.glob('*.html'))
print(f"找到 {len(html_files)} 個 HTML 文件")
print("=" * 50)
fixed_count = 0
for file_path in sorted(html_files):
if fix_navbar(file_path):
fixed_count += 1
print("=" * 50)
print(f"\n✅ 完成!修復了 {fixed_count} 個文件")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修復所有 HTML 文件的導航列顏色,從黑色改為淺色主題
"""
import os
import re
from pathlib import Path
def fix_navbar_color(file_path):
"""將導航列從 navbar-dark bg-dark 改為 navbar-light + 自定義背景"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
# 替換導航列的類別
# 從 navbar-dark bg-dark 改為 navbar-light bg-white
content = re.sub(
r'<nav class="navbar navbar-expand-xl navbar-dark bg-dark([^"]*)"',
r'<nav class="navbar navbar-expand-xl navbar-light bg-white\1"',
content
)
# 同時需要更新導航列的樣式,將 shadow-sm 改為更明顯的陰影
# 並添加自定義的 CSS 來美化導航列
# 檢查是否已經有 navbar 的自定義樣式
if '.navbar {' not in content or 'box-shadow: 0 2px 8px' not in content:
# 在 </style> 之前添加導航列樣式
navbar_style = """
/* Navbar Custom Styles */
.navbar {
box-shadow: 0 2px 12px rgba(0,0,0,0.1) !important;
border-bottom: 1px solid #e5e7eb;
}
.navbar-brand {
font-weight: 600;
color: #1f2937 !important;
}
.navbar-light .navbar-nav .nav-link {
color: #4b5563 !important;
font-weight: 500;
transition: all 0.3s;
}
.navbar-light .navbar-nav .nav-link:hover {
color: #6366f1 !important;
}
.navbar-light .navbar-nav .nav-link.active {
color: #6366f1 !important;
font-weight: 600;
}
.navbar-text {
color: #6b7280 !important;
}
"""
content = content.replace('</style>', navbar_style + ' </style>')
# 只有內容改變時才寫入
if content != original_content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return True
return False
except Exception as e:
print(f"處理 {file_path} 時發生錯誤: {e}")
return False
def main():
"""主程序"""
base_dir = Path(__file__).parent
html_files = list(base_dir.glob("*.html"))
updated_count = 0
print(f"找到 {len(html_files)} 個 HTML 文件")
print("=" * 60)
for html_file in html_files:
if fix_navbar_color(html_file):
print(f"✅ 已更新: {html_file.name}")
updated_count += 1
else:
print(f"⏭️ 跳過: {html_file.name}")
print("=" * 60)
print(f"✅ 完成!共更新 {updated_count} 個文件")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
將導航列改為深灰色調,與淺色背景更協調
"""
import os
import re
from pathlib import Path
def apply_dark_gray_navbar(file_path):
"""將導航列改為深灰色調"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
# 將 bg-dark 改為自定義類別
content = re.sub(
r'<nav class="navbar navbar-expand-xl navbar-dark bg-dark([^"]*)"',
r'<nav class="navbar navbar-expand-xl navbar-dark bg-custom-dark\1"',
content
)
# 檢查是否需要添加自定義樣式
if 'bg-custom-dark' in content and '.navbar.bg-custom-dark' not in content:
# 在 </style> 之前添加自定義樣式
custom_navbar_style = """ /* Custom Dark Gray Navbar */
.navbar.bg-custom-dark {
background: linear-gradient(135deg, #1f2937 0%, #374151 100%);
box-shadow: 0 2px 8px rgba(0,0,0,0.15);
border-bottom: 1px solid rgba(255,255,255,0.1);
}
.navbar.bg-custom-dark .navbar-brand {
color: #ffffff;
font-weight: 600;
}
.navbar.bg-custom-dark .navbar-nav .nav-link {
color: rgba(255, 255, 255, 0.85);
font-weight: 500;
}
.navbar.bg-custom-dark .navbar-nav .nav-link:hover {
color: #ffffff;
}
.navbar.bg-custom-dark .navbar-nav .nav-link.active {
color: #ffffff;
font-weight: 600;
}
.navbar.bg-custom-dark .navbar-text {
color: rgba(255, 255, 255, 0.75);
}
"""
content = content.replace(' </style>', custom_navbar_style + ' </style>')
# 只有內容改變時才寫入
if content != original_content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return True
return False
except Exception as e:
print(f"處理 {file_path} 時發生錯誤: {e}")
return False
def main():
"""主程序"""
base_dir = Path(__file__).parent
html_files = list(base_dir.glob("*.html"))
updated_count = 0
print(f"找到 {len(html_files)} 個 HTML 文件")
print("=" * 60)
for html_file in html_files:
if apply_dark_gray_navbar(html_file):
print(f"✅ 已更新: {html_file.name}")
updated_count += 1
else:
print(f"⏭️ 跳過: {html_file.name}")
print("=" * 60)
print(f"✅ 完成!共更新 {updated_count} 個文件為深灰色導航列")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,60 @@
import os
import sys
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from utils.image_url_builder import get_product_image_url
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
# 指定的商品編號
i_codes = ['14672839', '9216349', '13261678', '6676895', '13240823', '13240822', '10911080', '3876337', '8863611']
print("🔧 修復指定商品的圖片 URL:\n")
print("=" * 100)
success_count = 0
fail_count = 0
for i, i_code in enumerate(i_codes, 1):
product = session.query(Product).filter(Product.i_code == i_code).first()
if product:
print(f"\n[{i}/{len(i_codes)}] 處理: [{i_code}] {product.name[:50]}...")
print(f" ❌ 舊圖片: {product.image_url}")
# 獲取新的圖片 URL
new_image_url = get_product_image_url(i_code)
if new_image_url:
product.image_url = new_image_url
session.commit()
print(f" ✅ 更新: {new_image_url}")
success_count += 1
else:
print(f" ⚠️ 無法獲取圖片(可能已下架或不存在)")
fail_count += 1
# 延遲避免請求過快
if i < len(i_codes):
time.sleep(0.5)
else:
print(f"\n[{i}/{len(i_codes)}] ⚠️ 找不到商品: [{i_code}]")
fail_count += 1
print("\n" + "=" * 100)
print(f"\n📊 修復結果:")
print(f" ✅ 成功: {success_count}")
print(f" ❌ 失敗: {fail_count}")
print(f" 總計: {len(i_codes)}")
session.close()

View File

@@ -0,0 +1,197 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修復廠商郵件資料
如果您之前匯入的廠商沒有郵件資料,可以使用此腳本重新匯入
"""
import sys
import os
import pandas as pd
# 加入專案路徑
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
def import_vendor_emails_from_excel(excel_path):
"""
從 Excel 匯入廠商郵件
Args:
excel_path: Excel 檔案路徑
預期欄位:
- 來源供應商編號 (必填)
- 來源供應商名稱 (必填)
- Mail (選填)
"""
print("=" * 60)
print("廠商郵件資料匯入工具")
print("=" * 60)
# 檢查檔案是否存在
if not os.path.exists(excel_path):
print(f"\n❌ 檔案不存在: {excel_path}")
return
print(f"\n讀取檔案: {excel_path}")
try:
# 讀取 Excel
df = pd.read_excel(excel_path)
print(f"總行數: {len(df)}")
print(f"欄位: {list(df.columns)}")
# 驗證欄位
required_columns = ['來源供應商編號', '來源供應商名稱']
missing = [col for col in required_columns if col not in df.columns]
if missing:
print(f"\n❌ 缺少必要欄位: {', '.join(missing)}")
return
if 'Mail' not in df.columns:
print(f"\n⚠️ 未找到 'Mail' 欄位,無法匯入郵件資料")
return
# 初始化資料庫
db = VendorDatabaseManager()
# 統計
vendor_updated = 0
email_added = 0
email_skipped = 0
vendor_not_found = 0
print(f"\n開始處理...")
# 逐行處理
for idx, row in df.iterrows():
vendor_code = str(row.get('來源供應商編號', '')).strip()
vendor_name = str(row.get('來源供應商名稱', '')).strip()
# 支援多種郵件欄位名稱Mail, MAIL, mail, 郵件 等)
email = ''
for mail_col in ['Mail', 'MAIL', 'mail', 'E-mail', 'EMAIL', 'email', '郵件', 'E-Mail']:
if mail_col in df.columns and pd.notna(row.get(mail_col)):
email = str(row.get(mail_col, '')).strip()
break
if not vendor_code or not vendor_name:
continue
# 檢查廠商是否存在
vendor = db.get_vendor_by_code(vendor_code)
if not vendor:
print(f" ⚠️ 廠商不存在: {vendor_code} - {vendor_name}")
vendor_not_found += 1
# 詢問是否要新增
if idx == 0 or vendor_not_found <= 3: # 只問前幾個
response = input(f" 是否要新增此廠商? (y/n): ").lower()
if response == 'y':
new_vendor = db.add_vendor(vendor_code, vendor_name)
if new_vendor:
print(f" ✅ 已新增廠商")
vendor_updated += 1
else:
print(f" ❌ 新增失敗")
continue
else:
continue
else:
continue
else:
vendor_updated += 1
# 如果有郵件地址,新增到廠商郵件表
if email and '@' in email:
# 可能有多個郵件(用逗號或分號分隔)
emails = email.replace(';', ',').split(',')
for single_email in emails:
single_email = single_email.strip()
if single_email and '@' in single_email:
result = db.add_vendor_email(
vendor_code=vendor_code,
email=single_email,
email_type='primary'
)
if result:
print(f"{vendor_code} | 新增郵件: {single_email}")
email_added += 1
else:
print(f" ⚠️ {vendor_code} | 郵件已存在: {single_email}")
email_skipped += 1
# 顯示結果
print("\n" + "=" * 60)
print("匯入完成")
print("=" * 60)
print(f"處理廠商數: {vendor_updated}")
print(f"廠商不存在: {vendor_not_found}")
print(f"新增郵件數: {email_added}")
print(f"跳過郵件數: {email_skipped} (重複)")
print("=" * 60)
except Exception as e:
print(f"\n❌ 處理失敗: {e}")
import traceback
traceback.print_exc()
def show_current_status():
"""顯示目前廠商郵件狀況"""
print("\n" + "=" * 60)
print("目前廠商郵件狀況")
print("=" * 60)
from database.vendor_models import VendorList, VendorEmail
db = VendorDatabaseManager()
session = db.get_session()
try:
vendors = session.query(VendorList).all()
total_emails = session.query(VendorEmail).count()
print(f"\n總廠商數: {len(vendors)}")
print(f"總郵件數: {total_emails}")
vendors_with_email = 0
vendors_without_email = 0
for vendor in vendors:
emails = session.query(VendorEmail).filter_by(vendor_id=vendor.id).all()
if emails:
vendors_with_email += 1
else:
vendors_without_email += 1
print(f"有郵件的廠商: {vendors_with_email}")
print(f"無郵件的廠商: {vendors_without_email}")
if vendors_without_email > 0:
print(f"\n⚠️ 有 {vendors_without_email} 個廠商沒有設定郵件地址")
print(" 建議使用此工具匯入郵件資料")
finally:
session.close()
if __name__ == '__main__':
# 顯示目前狀況
show_current_status()
# 詢問是否要匯入
print("\n" + "=" * 60)
excel_path = input("請輸入 Excel 檔案路徑 (或按 Enter 跳過): ").strip()
if excel_path:
import_vendor_emails_from_excel(excel_path)
# 再次顯示狀況
show_current_status()
else:
print("\n已取消")

View File

@@ -0,0 +1,95 @@
# cSpell:ignore momo bottomicon
"""
修正錯誤圖片腳本
清理資料庫中的錯誤圖片bottomIcon 等),並標記為需要重新抓取
"""
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from database.edm_models import PromoProduct
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
def fix_wrong_images():
"""清理錯誤的圖片 URL"""
print("🔧 開始修正錯誤圖片...\n")
if not os.path.exists(DB_PATH):
print(f"❌ 資料庫檔案不存在: {DB_PATH}")
return
try:
engine = create_engine(f"sqlite:///{DB_PATH}")
Session = sessionmaker(bind=engine)
session = Session()
# 定義錯誤圖片的關鍵字
wrong_patterns = [
'%bottomIcon%',
'%bottomicon%',
'%loader.gif%',
'%blank.png%'
]
print("=" * 60)
print("📦 修正一般商品 (Product)")
print("=" * 60)
fixed_count = 0
for pattern in wrong_patterns:
products = session.query(Product).filter(
Product.image_url.like(pattern)
).all()
for p in products:
print(f"修正: [{p.i_code}] {p.name[:50]}...")
print(f" 錯誤圖片: {p.image_url}")
p.image_url = None # 清空錯誤的圖片 URL
fixed_count += 1
session.commit()
print(f"\n✅ 已修正 {fixed_count} 個商品的錯誤圖片")
# 檢查促銷商品
print("\n" + "=" * 60)
print("🎁 檢查促銷商品 (PromoProduct)")
print("=" * 60)
promo_fixed = 0
for pattern in wrong_patterns:
promos = session.query(PromoProduct).filter(
PromoProduct.image_url.like(pattern)
).all()
for p in promos:
print(f"修正: [{p.i_code}] {p.name[:50]}...")
print(f" 錯誤圖片: {p.image_url}")
p.image_url = None
promo_fixed += 1
session.commit()
print(f"\n✅ 已修正 {promo_fixed} 個促銷商品的錯誤圖片")
print("\n" + "=" * 60)
print("🎉 修正完成!")
print("=" * 60)
print(f"總計修正: {fixed_count + promo_fixed} 個商品")
print("\n💡 提示: 下次執行爬蟲時,這些商品會重新抓取圖片。")
session.close()
except Exception as e:
print(f"❌ 修正失敗: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
fix_wrong_images()

View File

View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""
初始化管理員帳號腳本
用於建立系統的第一個管理員帳號。
執行方式python init_admin.py
預設帳號admin
預設密碼Wooo@2026! (符合密碼複雜度要求)
"""
import os
import sys
# 確保專案根目錄在 sys.path
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.manager import DatabaseManager, get_session
from services.user_service import create_initial_admin
from database.user_models import User
def main():
print("=" * 60)
print(" WOOO TECH - 初始化管理員帳號")
print("=" * 60)
print()
# 初始化資料庫(確保資料表已建立)
print("1. 初始化資料庫...")
try:
db_manager = DatabaseManager()
print(" 資料庫連線成功")
except Exception as e:
print(f" 資料庫連線失敗: {e}")
sys.exit(1)
# 取得資料庫 session
db_session = get_session()
try:
# 檢查是否已有管理員
print()
print("2. 檢查現有管理員...")
existing_admin = db_session.query(User).filter(User.role == User.ROLE_ADMIN).first()
if existing_admin:
print(f" 已存在管理員帳號: {existing_admin.username}")
print()
print(" 如需重設密碼,請使用用戶管理介面或直接修改資料庫。")
print()
return
# 建立管理員
print()
print("3. 建立管理員帳號...")
success, message = create_initial_admin(db_session)
if success:
print(f" {message}")
print()
print("=" * 60)
print(" 管理員帳號建立成功!")
print("=" * 60)
print()
print(" 帳號: admin")
print(" 密碼: Wooo@2026!")
print()
print(" 請登入後立即修改密碼!")
print("=" * 60)
else:
print(f" {message}")
except Exception as e:
print(f" 發生錯誤: {e}")
import traceback
traceback.print_exc()
finally:
db_session.close()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,12 @@
from sqlalchemy import create_engine
from database.models import Base, Category, Product, PriceRecord
from database.edm_models import PromoProduct
from config import DATABASE_PATH
def initialize():
engine = create_engine(DATABASE_PATH)
Base.metadata.create_all(engine)
print("✅ 資料庫結構建立完成 (包含 products, categories, price_records, promo_products)")
if __name__ == "__main__":
initialize()

View File

@@ -0,0 +1,51 @@
# cSpell:ignore momo
import os
import sys
import pandas as pd # type: ignore
from sqlalchemy import create_engine, inspect
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
def inspect_columns():
print("🔍 正在檢視資料庫欄位結構...")
if not os.path.exists(DB_PATH):
print(f"❌ 資料庫檔案不存在: {DB_PATH}")
return
try:
engine = create_engine(f"sqlite:///{DB_PATH}")
inspector = inspect(engine)
table_name = 'realtime_sales_monthly'
if table_name not in inspector.get_table_names():
print(f"❌ 找不到資料表: {table_name}")
return
# 1. 列出所有欄位
columns = [col['name'] for col in inspector.get_columns(table_name)]
print(f"\n📋 資料表 [{table_name}] 包含以下 {len(columns)} 個欄位:")
print("-" * 50)
for i, col in enumerate(columns):
print(f"{i+1}. {col}")
print("-" * 50)
# 2. 顯示前 3 筆資料範例 (幫助判斷欄位內容)
print("\n📊 資料範例 (前 3 筆):")
df = pd.read_sql(f"SELECT * FROM {table_name} LIMIT 3", engine)
print(df.to_string())
print("-" * 50)
# 3. 特別檢查是否有訂單編號相關欄位
potential_order_ids = [c for c in columns if '訂單' in c or '編號' in c or 'ID' in c or 'No' in c]
if potential_order_ids:
print(f"💡 發現可能的訂單編號欄位: {potential_order_ids}")
else:
print("⚠️ 未發現明顯的訂單編號欄位 (可能無法計算真實客單價)")
except Exception as e:
print(f"❌ 讀取失敗: {e}")
if __name__ == "__main__":
inspect_columns()

View File

@@ -0,0 +1,69 @@
import os
import sys
import time
from sqlalchemy import create_engine, inspect, text
# 設定專案路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
def inspect_sales_data():
table_name = 'realtime_sales_monthly'
# 確保路徑與 app.py / clear_sales_table.py 一致
db_path = os.path.join(BASE_DIR, 'data', 'momo_database.db')
print(f"🔍 正在檢查資料庫檔案: {db_path}")
if os.path.exists(db_path):
mtime = time.ctime(os.path.getmtime(db_path))
size = os.path.getsize(db_path)
print(f"📂 檔案狀態: 存在 | 大小: {size} bytes | 最後修改: {mtime}")
if not os.path.exists(db_path):
print("❌ 資料庫檔案不存在!請確認路徑是否正確。")
return
# V-Fix: 直接建立 engine不透過 DatabaseManager 以避免觸發自動修復邏輯 (寫入操作)
engine = create_engine(f'sqlite:///{db_path}')
try:
inspector = inspect(engine)
# 1. 檢查表格是否存在
if not inspector.has_table(table_name):
print(f"✅ 資料表 '{table_name}' 不存在 (確認已從資料庫中移除)。")
print("👉 如果您在網頁上還看得到資料,請務必【重啟 app.py】以清除記憶體快取。")
return
print(f"⚠️ 資料表 '{table_name}' 仍然存在於資料庫中!")
# 2. 檢查資料筆數
with engine.connect() as conn:
count = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")).scalar()
print(f"📊 目前資料筆數: {count}")
if count > 0:
print("\n--- 前 5 筆資料預覽 ---")
try:
result = conn.execute(text(f"SELECT * FROM {table_name} LIMIT 5"))
keys = list(result.keys())
print(f"欄位: {keys}")
for row in result:
print(row)
# 特別檢查 '狀態' 欄位 (如果存在)
if '狀態' in keys:
print("\n--- '狀態' 欄位值範例 (確認是否為文字) ---")
status_sample = conn.execute(text(f"SELECT DISTINCT \"狀態\" FROM {table_name} LIMIT 10")).fetchall()
print([r[0] for r in status_sample])
except Exception as e:
print(f"❌ 讀取資料失敗: {e}")
else:
print(" 資料表存在但無資料 (空表)。")
except Exception as e:
print(f"❌ 資料庫連接或查詢失敗: {e}")
print("👉 請確認 app.py 是否已關閉 (可能鎖定了資料庫檔案)")
if __name__ == "__main__":
inspect_sales_data()

View File

@@ -0,0 +1,25 @@
import sys
import os
# 加入專案路徑
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__))))
from database.manager import DatabaseManager
from database.models import Base, MonthlySummaryAnalysis
def migrate():
print("Starting database migration for MonthlySummaryAnalysis...")
try:
db = DatabaseManager()
engine = db.engine
# 建立所有尚未存在的資料表 (包含 MonthlySummaryAnalysis)
Base.metadata.create_all(engine)
print("✓ Database migration completed successfully.")
except Exception as e:
print(f"✗ Migration failed: {e}")
sys.exit(1)
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,52 @@
import logging
import sys
import os
from sqlalchemy import text
# 確保專案根目錄在 sys.path 中
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.manager import DatabaseManager
from scheduler import run_edm_task
# 設定日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def reset_and_scrape():
"""
清空 promo_products 資料表並重新執行 EDM 爬蟲
"""
db = DatabaseManager()
session = db.get_session()
try:
print("="*50)
logging.info("🗑️ 正在清除所有 EDM (限時搶購) 歷史數據...")
# 執行清空指令 (SQLite 使用 DELETEMySQL/PG 使用 TRUNCATE)
session.execute(text("DELETE FROM promo_products"))
session.commit()
logging.info("✅ 資料庫已清空。")
print("="*50)
logging.info("🚀 重新啟動 EDM 爬蟲任務 (抓取完整數據)...")
# 呼叫 scheduler.py 中的爬蟲函式
run_edm_task()
logging.info("🎉 重置與重新抓取完成!請刷新網頁查看結果。")
print("="*50)
except Exception as e:
logging.error(f"❌ 發生錯誤: {e}")
session.rollback()
finally:
session.close()
if __name__ == "__main__":
# 再次確認提示
confirm = input("⚠️ 警告:這將刪除所有「限時搶購」的歷史價格紀錄,確定要繼續嗎?(y/n): ")
if confirm.lower() == 'y':
reset_and_scrape()
else:
print("已取消操作。")

View File

@@ -0,0 +1,5 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
從備份恢復資料庫
保留 vendor 相關的新數據

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
完全還原導航列到原始狀態(黑色主題,無自定義樣式)
"""
import os
import re
from pathlib import Path
def restore_original_navbar(file_path):
"""將導航列完全還原到原始狀態"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
# 將任何變體改回原始的 navbar-dark bg-dark
content = re.sub(
r'<nav class="navbar navbar-expand-xl navbar-(dark|light) bg-(dark|white|primary)([^"]*)"',
r'<nav class="navbar navbar-expand-xl navbar-dark bg-dark\3"',
content
)
# 移除所有導航列相關的自定義樣式
# 移除 /* Blue Navbar Theme */ 區塊
content = re.sub(
r'\s*/\* Blue Navbar Theme \*/.*?(?=\s*\.|\s*</style>)',
'\n',
content,
flags=re.DOTALL
)
# 移除 /* Navbar Custom Styles */ 區塊
content = re.sub(
r'\s*/\* Navbar Custom Styles \*/.*?(?=\s*\.|\s*</style>)',
'\n',
content,
flags=re.DOTALL
)
# 清理多餘的空行
content = re.sub(r'\n{3,}', '\n\n', content)
# 只有內容改變時才寫入
if content != original_content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return True
return False
except Exception as e:
print(f"處理 {file_path} 時發生錯誤: {e}")
return False
def main():
"""主程序"""
base_dir = Path(__file__).parent
html_files = list(base_dir.glob("*.html"))
updated_count = 0
print(f"找到 {len(html_files)} 個 HTML 文件")
print("=" * 60)
for html_file in html_files:
if restore_original_navbar(html_file):
print(f"✅ 已還原: {html_file.name}")
updated_count += 1
else:
print(f"⏭️ 跳過: {html_file.name}")
print("=" * 60)
print(f"✅ 完成!共還原 {updated_count} 個文件到原始狀態")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
將導航列改回原本的黑色主題
"""
import os
import re
from pathlib import Path
def revert_navbar_to_dark(file_path):
"""將導航列從淺色改回黑色主題"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
# 將淺色導航列改回黑色
content = re.sub(
r'<nav class="navbar navbar-expand-xl navbar-light bg-white([^"]*)"',
r'<nav class="navbar navbar-expand-xl navbar-dark bg-dark\1"',
content
)
# 移除淺色主題的自定義樣式
# 找到 /* Navbar Custom Styles */ 開始到結束的區塊並移除
content = re.sub(
r'\s*/\* Navbar Custom Styles \*/.*?\.navbar-text \{[^}]*\}\s*',
'\n',
content,
flags=re.DOTALL
)
# 只有內容改變時才寫入
if content != original_content:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return True
return False
except Exception as e:
print(f"處理 {file_path} 時發生錯誤: {e}")
return False
def main():
"""主程序"""
base_dir = Path(__file__).parent
html_files = list(base_dir.glob("*.html"))
updated_count = 0
print(f"找到 {len(html_files)} 個 HTML 文件")
print("=" * 60)
for html_file in html_files:
if revert_navbar_to_dark(html_file):
print(f"✅ 已還原: {html_file.name}")
updated_count += 1
else:
print(f"⏭️ 跳過: {html_file.name}")
print("=" * 60)
print(f"✅ 完成!共還原 {updated_count} 個文件")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,125 @@
# cSpell:ignore momo
"""
更新所有監控商品的圖片
使用 i_code 從商品詳情頁直接獲取圖片 URL
"""
import os
import sys
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from utils.image_url_builder import get_product_image_url
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
def update_all_images(batch_size: int = 100, delay: float = 0.5, start_from: int = 0):
"""
更新所有監控商品的圖片
Args:
batch_size: 每批處理的商品數量
delay: 每個請求之間的延遲(秒)
start_from: 從第幾個商品開始處理(用於斷點續傳)
"""
print(f"🔄 開始更新所有監控商品的圖片(從第 {start_from + 1} 個開始)...\n")
if not os.path.exists(DB_PATH):
print(f"❌ 資料庫檔案不存在: {DB_PATH}")
return
try:
engine = create_engine(f"sqlite:///{DB_PATH}")
Session = sessionmaker(bind=engine)
session = Session()
# 查詢所有商品的總數
total_count = session.query(Product).count()
print(f"📊 資料庫中共有 {total_count} 個監控商品")
print(f"📦 本次將處理 {min(batch_size, total_count - start_from)} 個商品\n")
# 查詢指定範圍的商品
products = session.query(Product).offset(start_from).limit(batch_size).all()
if not products:
print("✅ 沒有需要處理的商品!")
return
success_count = 0
fail_count = 0
skip_count = 0
for idx, product in enumerate(products, 1):
global_idx = start_from + idx
print(f"[{global_idx}/{total_count}] 處理: [{product.i_code}] {product.name[:50]}...")
try:
# 從商品詳情頁獲取圖片 URL
image_url = get_product_image_url(product.i_code)
if image_url:
# 檢查圖片是否有變化
if product.image_url != image_url:
product.image_url = image_url
session.commit()
print(f" ✅ 更新: {image_url}")
success_count += 1
else:
print(f" ⏭️ 相同: 圖片已是最新")
skip_count += 1
else:
print(f" ⚠️ 無法獲取圖片(可能已下架或不存在)")
fail_count += 1
# 延遲,避免請求過快
time.sleep(delay)
except Exception as e:
print(f" ❌ 錯誤: {e}")
fail_count += 1
session.rollback()
session.close()
print("\n" + "=" * 60)
print("📊 本批更新結果")
print("=" * 60)
total = len(products)
print(f"✅ 成功更新: {success_count}/{total}")
print(f"⏭️ 跳過(相同): {skip_count}/{total}")
print(f"❌ 失敗: {fail_count}/{total}")
# 顯示進度
completed = start_from + len(products)
progress = completed / total_count * 100
remaining = total_count - completed
print(f"\n📈 總進度: {completed}/{total_count} ({progress:.1f}%)")
if remaining > 0:
print(f"⏳ 剩餘: {remaining} 個商品")
print(f"\n💡 提示: 繼續執行下一批,使用參數 --start-from {completed}")
else:
print(f"\n🎉 所有商品已處理完成!")
except Exception as e:
print(f"❌ 批次更新失敗: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='更新所有監控商品的圖片')
parser.add_argument('--batch-size', type=int, default=100, help='每批處理的商品數量(預設: 100')
parser.add_argument('--delay', type=float, default=0.5, help='每個請求之間的延遲秒數(預設: 0.5')
parser.add_argument('--start-from', type=int, default=0, help='從第幾個商品開始處理(預設: 0')
args = parser.parse_args()
update_all_images(batch_size=args.batch_size, delay=args.delay, start_from=args.start_from)

View File

@@ -0,0 +1,107 @@
# cSpell:ignore momo
"""
批次更新缺少圖片的商品
使用 i_code 從商品詳情頁直接獲取圖片 URL
"""
import os
import sys
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
from utils.image_url_builder import get_product_image_url
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
def update_missing_images(batch_size: int = 50, delay: float = 0.5):
"""
批次更新缺少圖片的商品
Args:
batch_size: 每批處理的商品數量
delay: 每個請求之間的延遲(秒),避免請求過快
"""
print("🔄 開始批次更新缺少圖片的商品...\n")
if not os.path.exists(DB_PATH):
print(f"❌ 資料庫檔案不存在: {DB_PATH}")
return
try:
engine = create_engine(f"sqlite:///{DB_PATH}")
Session = sessionmaker(bind=engine)
session = Session()
# 查詢所有缺圖的商品
missing_products = session.query(Product).filter(
(Product.image_url == None) | (Product.image_url == '')
).limit(batch_size).all()
total = len(missing_products)
print(f"📊 找到 {total} 個缺圖商品(本次處理前 {batch_size} 個)\n")
if total == 0:
print("✅ 沒有需要更新的商品!")
return
success_count = 0
fail_count = 0
for idx, product in enumerate(missing_products, 1):
print(f"[{idx}/{total}] 處理: [{product.i_code}] {product.name[:50]}...")
try:
# 從商品詳情頁獲取圖片 URL
image_url = get_product_image_url(product.i_code)
if image_url:
product.image_url = image_url
session.commit()
print(f" ✅ 成功: {image_url}")
success_count += 1
else:
print(f" ⚠️ 無法獲取圖片(可能已下架或不存在)")
fail_count += 1
# 延遲,避免請求過快
time.sleep(delay)
except Exception as e:
print(f" ❌ 錯誤: {e}")
fail_count += 1
session.rollback()
session.close()
print("\n" + "=" * 60)
print("📊 更新結果")
print("=" * 60)
print(f"✅ 成功: {success_count}/{total} ({success_count/total*100:.1f}%)")
print(f"❌ 失敗: {fail_count}/{total} ({fail_count/total*100:.1f}%)")
if success_count > 0:
print(f"\n💡 提示: 已更新 {success_count} 個商品的圖片!")
if fail_count > 0:
print(f"\n⚠️ 注意: {fail_count} 個商品無法獲取圖片(可能已下架)")
except Exception as e:
print(f"❌ 批次更新失敗: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='批次更新缺圖商品')
parser.add_argument('--batch-size', type=int, default=50, help='每批處理的商品數量(預設: 50')
parser.add_argument('--delay', type=float, default=0.5, help='每個請求之間的延遲秒數(預設: 0.5')
args = parser.parse_args()
update_missing_images(batch_size=args.batch_size, delay=args.delay)

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
批量更新所有 HTML 文件的導航列結構
從 9 個平鋪項目改為 6 個項目(含下拉菜單)
"""
import os
import re
from pathlib import Path
# 新的導航列HTML6個項目含下拉菜單
NEW_NAVBAR_ITEMS = ''' <ul class="navbar-nav me-auto">
<li class="nav-item"><a class="nav-link{active_dashboard}" href="/"><i class="fas fa-chart-line me-1"></i>商品看板</a></li>
<li class="nav-item"><a class="nav-link{active_edm}" href="/edm"><i class="fas fa-bullhorn me-1"></i>活動看板</a></li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle{active_analysis}" href="#" role="button" data-bs-toggle="dropdown">
<i class="fas fa-chart-bar me-1"></i>分析報表
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/sales_analysis"><i class="fas fa-chart-bar me-2"></i>業績分析</a></li>
<li><a class="dropdown-item" href="/daily_sales"><i class="fas fa-calendar-day me-2"></i>當日業績</a></li>
</ul>
</li>
<li class="nav-item"><a class="nav-link{active_vendor}" href="/vendor-stockout"><i class="fas fa-box-open me-1"></i>廠商缺貨</a></li>
<li class="nav-item"><a class="nav-link{active_import}" href="/auto_import"><i class="fas fa-cloud-download-alt me-1"></i>雲端匯入</a></li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle{active_system}" href="#" role="button" data-bs-toggle="dropdown">
<i class="fas fa-cog me-1"></i>系統管理
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/crawler_management"><i class="fas fa-robot me-2"></i>爬蟲管理</a></li>
<li><a class="dropdown-item" href="/settings"><i class="fas fa-cog me-2"></i>爬蟲設定</a></li>
<li><a class="dropdown-item" href="/system_settings"><i class="fas fa-sliders-h me-2"></i>系統設定</a></li>
<li><a class="dropdown-item" href="/logs"><i class="fas fa-file-alt me-2"></i>系統日誌</a></li>
</ul>
</li>
</ul>'''
# 頁面對應的 active 標記
PAGE_ACTIVE_MAP = {
'dashboard.html': {'active_dashboard': ' active'},
'edm_dashboard.html': {'active_edm': '{edm_active}'}, # 保留原有的 Jinja2 邏輯
'sales_analysis.html': {'active_analysis': ' active'},
'daily_sales.html': {'active_analysis': ' active'},
'vendor_stockout_index.html': {'active_vendor': ' active'},
'vendor_stockout_list.html': {'active_vendor': ' active'},
'vendor_stockout_import.html': {'active_vendor': ' active'},
'vendor_stockout_send_email.html': {'active_vendor': ' active'},
'vendor_stockout_history.html': {'active_vendor': ' active'},
'auto_import_index.html': {'active_import': ' active'},
'settings.html': {'active_system': ' active'},
'system_settings.html': {'active_system': ' active'},
'logs.html': {'active_system': ' active'},
}
def update_navbar_in_file(file_path):
"""更新單個 HTML 文件的導航列"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 檢查是否有舊的導航列結構
if '<ul class="navbar-nav me-auto">' not in content:
print(f"⏭️ 跳過 {file_path.name}(沒有找到導航列)")
return False
# 確定這個頁面的 active 標記
filename = file_path.name
active_map = PAGE_ACTIVE_MAP.get(filename, {})
# 準備新的導航列內容
navbar_items = NEW_NAVBAR_ITEMS
navbar_items = navbar_items.replace('{active_dashboard}', active_map.get('active_dashboard', ''))
navbar_items = navbar_items.replace('{active_edm}', active_map.get('active_edm', ''))
navbar_items = navbar_items.replace('{active_analysis}', active_map.get('active_analysis', ''))
navbar_items = navbar_items.replace('{active_vendor}', active_map.get('active_vendor', ''))
navbar_items = navbar_items.replace('{active_import}', active_map.get('active_import', ''))
navbar_items = navbar_items.replace('{active_system}', active_map.get('active_system', ''))
# 特殊處理 edm_dashboard.html 的 Jinja2 邏輯
if filename == 'edm_dashboard.html':
navbar_items = navbar_items.replace('{edm_active}', " {% if current_promo_page in ['edm', 'festival'] %}active{% endif %}")
else:
navbar_items = navbar_items.replace('{edm_active}', '')
# 替換導航列項目(使用正則匹配從 <ul class="navbar-nav me-auto"> 到 </ul>
pattern = r'<ul class="navbar-nav me-auto">.*?</ul>'
new_content = re.sub(pattern, navbar_items, content, flags=re.DOTALL)
if new_content == content:
print(f"⏭️ 跳過 {file_path.name}(沒有變更)")
return False
# 同時確保使用 navbar-expand-xl
new_content = re.sub(r'<nav class="navbar navbar-dark', '<nav class="navbar navbar-expand-xl navbar-dark', new_content)
# 寫回文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
print(f"✅ 已更新 {file_path.name}")
return True
def main():
"""主函數"""
base_dir = Path(__file__).parent
html_files = list(base_dir.glob('*.html'))
# 排除特定文件
exclude_files = {'maintenance.html', 'login.html'}
html_files = [f for f in html_files if f.name not in exclude_files]
print(f"找到 {len(html_files)} 個 HTML 文件")
print("=" * 50)
updated_count = 0
for html_file in sorted(html_files):
if update_navbar_in_file(html_file):
updated_count += 1
print("=" * 50)
print(f"\n✅ 完成!更新了 {updated_count} 個文件")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
商品圖片 URL 批次更新工具
功能:將所有商品的圖片 URL 更新為 MOMO CDN 標準格式
格式https://m.momoshop.com.tw/moscdn/goods/{i_code}_m.webp
"""
from database.manager import DatabaseManager
from database.models import Product
from logger_manager import SystemLogger
from datetime import datetime
log = SystemLogger("ImageURLUpdater").get_logger()
def get_image_path(i_code):
"""
將 i_code 轉換為圖片路徑
規則:從右往左取 3位/3位/剩餘第一部分補0到4位
例如12092813 → 813/092/12 → 0012/092/813
"""
i_code_str = str(i_code)
# 從右往左切分
part3 = i_code_str[-3:] if len(i_code_str) >= 3 else i_code_str.zfill(3) # 最後3位
part2 = i_code_str[-6:-3] if len(i_code_str) > 3 else '000' # 中間3位
part1 = i_code_str[:-6] if len(i_code_str) > 6 else '0' # 前面剩餘
part1 = part1.zfill(4) # 第一部分補0到4位
part2 = part2.zfill(3) # 第二部分補0到3位 (如果需要)
return f'{part1}/{part2}/{part3}'
def update_all_product_images():
"""批次更新所有商品的圖片 URL"""
db = DatabaseManager()
session = db.get_session()
try:
log.info("=" * 80)
log.info("🖼️ 開始批次更新商品圖片 URL")
log.info("=" * 80)
# 查詢所有商品
all_products = session.query(Product).all()
total_count = len(all_products)
log.info(f"📊 總商品數: {total_count}")
# 統計變數
updated_count = 0
no_change_count = 0
error_count = 0
log.info("\n🔄 開始處理商品...")
print() # 空行,讓輸出更清晰
for idx, product in enumerate(all_products, 1):
try:
# 構造標準圖片 URL (使用新格式)
# https://img.momoshop.com.tw/goodsimg/0012/092/813/12092813_OL_m.webp
path = get_image_path(product.i_code)
new_image_url = f"https://img.momoshop.com.tw/goodsimg/{path}/{product.i_code}_OL_m.webp"
# 判斷是否需要更新
needs_update = False
if product.image_url is None:
# 情況 1: 沒有圖片 URL
needs_update = True
reason = "無圖片 URL"
elif product.image_url != new_image_url:
# 情況 2: 圖片 URL 格式不正確
needs_update = True
reason = "格式需更新"
if needs_update:
old_url = product.image_url if product.image_url else ""
product.image_url = new_image_url
product.updated_at = datetime.now()
updated_count += 1
if updated_count % 100 == 0:
log.info(f" ✅ 已更新 {updated_count}/{total_count} 件商品...")
# 詳細記錄(每 500 件顯示一次詳情)
if updated_count % 500 == 1 or updated_count <= 5:
log.debug(f" [{idx}/{total_count}] {reason}")
log.debug(f" 商品: {product.name[:40]}...")
log.debug(f" i_code: {product.i_code}")
log.debug(f" 舊 URL: {old_url[:60]}..." if len(str(old_url)) > 60 else f" 舊 URL: {old_url}")
log.debug(f" 新 URL: {new_image_url}")
else:
no_change_count += 1
except Exception as e:
error_count += 1
log.error(f" ❌ 處理失敗 | i_code: {product.i_code} | 商品: {product.name[:30]} | Error: {e}")
# 提交變更
log.info("\n💾 提交資料庫變更...")
session.commit()
# 輸出統計結果
log.info("\n" + "=" * 80)
log.info("📊 處理結果統計")
log.info("=" * 80)
log.info(f" 總商品數: {total_count}")
log.info(f" ✅ 已更新: {updated_count} ({updated_count/total_count*100:.1f}%)")
log.info(f" ⏭️ 無需更新: {no_change_count} ({no_change_count/total_count*100:.1f}%)")
log.info(f" ❌ 處理失敗: {error_count}")
log.info("=" * 80)
if updated_count > 0:
log.info(f"✅ 成功更新 {updated_count} 件商品的圖片 URL")
else:
log.info("✅ 所有商品圖片 URL 已是最新格式,無需更新")
return {
'total': total_count,
'updated': updated_count,
'no_change': no_change_count,
'error': error_count
}
except Exception as e:
session.rollback()
log.error(f"❌ 批次更新失敗: {e}")
raise
finally:
session.close()
if __name__ == "__main__":
try:
result = update_all_product_images()
# 驗證結果
print("\n" + "=" * 80)
print("🔍 驗證更新結果...")
print("=" * 80)
db = DatabaseManager()
session = db.get_session()
try:
# 檢查還有多少商品沒有圖片 URL
products_without_image = session.query(Product).filter(Product.image_url.is_(None)).count()
# 檢查使用標準格式的商品數
standard_pattern = "https://m.momoshop.com.tw/moscdn/goods/%"
products_with_standard_url = session.query(Product).filter(
Product.image_url.like(standard_pattern)
).count()
total_products = session.query(Product).count()
print(f"無圖片 URL 商品: {products_without_image} ({products_without_image/total_products*100:.1f}%)")
print(f"標準格式商品: {products_with_standard_url} ({products_with_standard_url/total_products*100:.1f}%)")
print("=" * 80)
if products_without_image == 0:
print("✅ 驗證成功:所有商品都有圖片 URL")
else:
print(f"⚠️ 仍有 {products_without_image} 件商品沒有圖片 URL")
finally:
session.close()
except KeyboardInterrupt:
print("\n⚠️ 使用者中斷執行")
except Exception as e:
print(f"\n❌ 執行失敗: {e}")
raise

View File

@@ -0,0 +1,99 @@
#!/usr/bin/env python3
"""批量更新 vendor_stockout 系列頁面的導航列"""
import re
from pathlib import Path
NEW_NAVBAR = ''' <!-- 導航列 -->
<nav class="navbar navbar-expand-xl navbar-dark bg-dark mb-4 shadow-sm">
<div class="container-fluid">
<a class="navbar-brand" href="/"><i class="fas fa-chart-line me-2"></i>MOMO 監控系統</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarNav">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarNav">
<ul class="navbar-nav me-auto">
<li class="nav-item"><a class="nav-link" href="/"><i class="fas fa-chart-line me-1"></i>商品看板</a></li>
<li class="nav-item"><a class="nav-link" href="/edm"><i class="fas fa-bullhorn me-1"></i>活動看板</a></li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown">
<i class="fas fa-chart-bar me-1"></i>分析報表
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/sales_analysis"><i class="fas fa-chart-bar me-2"></i>業績分析</a></li>
<li><a class="dropdown-item" href="/daily_sales"><i class="fas fa-calendar-day me-2"></i>當日業績</a></li>
</ul>
</li>
<li class="nav-item"><a class="nav-link active" href="/vendor-stockout"><i class="fas fa-box-open me-1"></i>廠商缺貨</a></li>
<li class="nav-item"><a class="nav-link" href="/auto_import"><i class="fas fa-cloud-download-alt me-1"></i>雲端匯入</a></li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown">
<i class="fas fa-cog me-1"></i>系統管理
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="/settings"><i class="fas fa-cog me-2"></i>爬蟲設定</a></li>
<li><a class="dropdown-item" href="/system_settings"><i class="fas fa-sliders-h me-2"></i>系統設定</a></li>
<li><a class="dropdown-item" href="/logs"><i class="fas fa-file-alt me-2"></i>系統日誌</a></li>
</ul>
</li>
</ul>
<span class="navbar-text text-light small">
<i class="fas fa-clock me-1"></i><span id="nav-time"></span>
</span>
</div>
</div>
</nav>'''
TIME_SCRIPT = ''' // 更新導航列時鐘
function updateNavTime() {
const now = new Date();
const elem = document.getElementById('nav-time');
if (elem) {
elem.textContent = now.toLocaleString('zh-TW', {
year: 'numeric', month: '2-digit', day: '2-digit',
hour: '2-digit', minute: '2-digit', second: '2-digit'
});
}
}
updateNavTime();
setInterval(updateNavTime, 1000);'''
def update_file(filepath):
"""更新單個文件"""
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# 替換導航列(從 <!-- 導航列 --> 或 <nav 到 </nav> 之後的空行)
pattern = r'(<!--\s*導航列\s*-->)?\s*<nav class="navbar.*?</nav>\s*'
new_content = re.sub(pattern, NEW_NAVBAR + '\n\n', content, count=1, flags=re.DOTALL)
# 添加時鐘腳本(在 </body> 前)
if 'updateNavTime' not in new_content and '</body>' in new_content:
new_content = new_content.replace('</body>', f' <script>\n{TIME_SCRIPT}\n </script>\n</body>')
if new_content != content:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(new_content)
print(f"✅ 已更新 {filepath.name}")
return True
else:
print(f"⏭️ 跳過 {filepath.name}")
return False
def main():
base_dir = Path(__file__).parent
vendor_files = list(base_dir.glob('vendor_stockout_*.html'))
print(f"找到 {len(vendor_files)} 個廠商缺貨頁面文件")
print("=" * 50)
updated = 0
for filepath in sorted(vendor_files):
if update_file(filepath):
updated += 1
print("=" * 50)
print(f"✅ 完成!更新了 {updated} 個文件")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,43 @@
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
i_codes = ['5939587', '14713987', '14237189', '13759520', '13330513', '13731019',
'14049583', '14695038', '14373672', '13646926', '4633943', '6271461', '13559833']
print("🔍 驗證第二批商品的圖片 URL:\n")
print("=" * 100)
correct = 0
incorrect = 0
for i_code in i_codes:
product = session.query(Product).filter(Product.i_code == i_code).first()
if product:
print(f"\n[{i_code}] {product.name[:50]}...")
print(f" 圖片: {product.image_url}")
if product.image_url and i_code in product.image_url:
print(f" ✅ 正確: URL 包含商品編號")
correct += 1
else:
print(f" ❌ 錯誤: URL 不包含商品編號")
incorrect += 1
print("\n" + "=" * 100)
print(f"\n📊 驗證結果:")
print(f" ✅ 正確: {correct}/{len(i_codes)}")
print(f" ❌ 錯誤: {incorrect}/{len(i_codes)}")
session.close()

View File

@@ -0,0 +1,37 @@
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=engine)
session = Session()
i_codes = ['6323590', '7904362', '5954422', '6009343', '4248163', '3331634', '7351198',
'12589232', '11118441', '10755887', '9009264', '9562474', '11038732', '10901361',
'9250857', '5477268', '9600967', '10440331', '10080994', '9900915', '3821684',
'10567236', '13351736', '13351734', '12777975', '11640264', '5894654', '10050962',
'8318051', '11593427', '9857434']
correct = 0
incorrect = 0
for i_code in i_codes:
product = session.query(Product).filter(Product.i_code == i_code).first()
if product and product.image_url and i_code in product.image_url:
correct += 1
else:
incorrect += 1
if product:
print(f"❌ [{i_code}] {product.image_url}")
print(f"\n📊 驗證結果: ✅ {correct}/{len(i_codes)} 正確")
if incorrect > 0:
print(f"{incorrect} 個錯誤")
session.close()

View File

@@ -0,0 +1,162 @@
# cSpell:ignore momo goodsimg
"""
驗證商品圖片 URL 是否正確對應到 i_code
檢查圖片 URL 中是否包含正確的商品編號
"""
import os
import sys
import re
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 設定路徑
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
from database.models import Product
# 資料庫路徑
DB_PATH = os.path.join(BASE_DIR, 'data', 'momo_database.db')
def verify_image_urls(sample_size: int = 50):
"""
驗證商品圖片 URL 是否正確對應到 i_code
Args:
sample_size: 抽樣檢查的商品數量
"""
print("🔍 驗證商品圖片 URL 是否正確對應...\n")
if not os.path.exists(DB_PATH):
print(f"❌ 資料庫檔案不存在: {DB_PATH}")
return
try:
engine = create_engine(f"sqlite:///{DB_PATH}")
Session = sessionmaker(bind=engine)
session = Session()
# 查詢有圖片的商品(隨機抽樣)
products = session.query(Product).filter(
Product.image_url != None,
Product.image_url != ''
).limit(sample_size).all()
total = len(products)
print(f"📊 抽樣檢查 {total} 個有圖片的商品\n")
print("=" * 80)
correct_count = 0
incorrect_count = 0
unclear_count = 0
for idx, product in enumerate(products, 1):
i_code = product.i_code
image_url = product.image_url
print(f"\n[{idx}/{total}] 商品: [{i_code}]")
print(f" 名稱: {product.name[:50]}...")
print(f" 圖片: {image_url}")
# 檢查圖片 URL 是否包含正確的 i_code
is_correct = check_image_url_match(i_code, image_url)
if is_correct == True:
print(f" ✅ 正確: 圖片 URL 包含商品編號")
correct_count += 1
elif is_correct == False:
print(f" ❌ 錯誤: 圖片 URL 不包含商品編號!")
incorrect_count += 1
else:
print(f" ⚠️ 無法確定: 需要人工檢查")
unclear_count += 1
session.close()
print("\n" + "=" * 80)
print("📊 驗證結果")
print("=" * 80)
print(f"✅ 正確: {correct_count}/{total} ({correct_count/total*100:.1f}%)")
print(f"❌ 錯誤: {incorrect_count}/{total} ({incorrect_count/total*100:.1f}%)")
print(f"⚠️ 無法確定: {unclear_count}/{total} ({unclear_count/total*100:.1f}%)")
if incorrect_count > 0:
print(f"\n⚠️ 警告: 發現 {incorrect_count} 個商品的圖片 URL 不正確!")
print("💡 建議: 執行 update_all_images.py 重新更新所有商品圖片")
elif correct_count == total:
print(f"\n🎉 太好了!所有抽樣商品的圖片 URL 都正確對應到 i_code")
except Exception as e:
print(f"❌ 驗證失敗: {e}")
import traceback
traceback.print_exc()
def check_image_url_match(i_code: str, image_url: str) -> bool | None:
"""
檢查圖片 URL 是否包含正確的 i_code
Args:
i_code: 商品編號
image_url: 圖片 URL
Returns:
True: 正確匹配
False: 不匹配
None: 無法確定
"""
if not image_url:
return None
# MOMO 圖片 URL 的標準格式:
# 1. https://og.momoshop.com.tw/{timestamp}/goodsimg/{XXXX}/{YYY}/{ZZZ}/{i_code}_L.jpg
# 2. https://i{1-8}.momoshop.com.tw/{timestamp}/goodsimg/{path}/{i_code}_O.webp
# 提取 URL 中的商品編號部分
# 純數字商品: 0014/548/538/14548538_L.jpg
# TP 商品: TP000/0829/0000/375/TP00008290000375_O.webp
if i_code.startswith('TP'):
# TP 開頭的商品
# 檢查 URL 是否包含完整的 i_code
if i_code in image_url:
return True
else:
return False
else:
# 純數字商品
# 格式: /0014/548/538/14548538_
try:
code_num = str(int(i_code)) # 去除前導零
# 檢查 URL 是否包含商品編號
if f"/{code_num}_" in image_url or f"/{code_num}." in image_url:
return True
# 檢查分段格式 (例如 /0014/548/538/ 對應 14548538)
code_str = code_num.zfill(8)
part3 = code_str[-3:]
part2 = code_str[-6:-3]
part1 = code_str[:-6].zfill(4)
# 檢查是否包含這個路徑結構
path_pattern = f"/{part1}/{part2}/{part3}/"
if path_pattern in image_url:
return True
return False
except ValueError:
# 無法轉換為數字
return None
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='驗證商品圖片 URL')
parser.add_argument('--sample-size', type=int, default=50, help='抽樣檢查的商品數量(預設: 50')
args = parser.parse_args()
verify_image_urls(sample_size=args.sample_size)

View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
驗證匯入的資料是否正確
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import VendorStockout
import re
def verify_import():
"""驗證匯入資料"""
db = VendorDatabaseManager()
session = db.get_session()
try:
# 查詢所有記錄
records = session.query(VendorStockout).order_by(VendorStockout.id).all()
print("=" * 80)
print(f"驗證匯入資料")
print("=" * 80)
if not records:
print("\n❌ 沒有資料!請確認是否成功匯入")
return
print(f"\n✅ 找到 {len(records)} 筆記錄")
# 檢查第一筆記錄
record = records[0]
print("\n" + "=" * 80)
print(f"第一筆記錄詳細資訊")
print("=" * 80)
print(f"\n📅 日期資訊:")
print(f" import_date: {record.import_date} (類型: {type(record.import_date)})")
if str(record.import_date) == "2026-01-05":
print(f" ✅ 日期正確!")
else:
print(f" ❌ 日期錯誤!應該是 2026-01-05")
print(f"\n👤 組織資訊:")
print(f" 處別: {record.department}")
print(f" 科別: {record.section}")
print(f" PM姓名: {record.pm_name}")
print(f"\n📦 商品資訊:")
print(f" 商品ID: {record.product_code}")
print(f" 商品名稱: {record.product_name[:50]}...")
print(f" 單品/組合商品: {record.product_spec}")
print(f"\n🏢 廠商資訊:")
print(f" 來源供應商編號: {record.vendor_code}")
print(f" 來源供應商名稱: {record.vendor_name}")
print(f"\n📊 數量資訊:")
print(f" 商品可賣量: {record.monthly_sales_qty}")
print(f" 缺貨日期: {record.current_stock}")
print(f" 缺貨商品前30天業績: {record.monthly_sales_amount}")
print(f" 最近30天銷售量: {record.daily_avg_sales}")
print(f" 庫存水位: {record.safe_stock_days}")
print(f"\n📝 備註資訊(解析前):")
print(f" {record.notes}")
# 解析 notes 中的額外欄位
if record.notes:
print(f"\n📝 備註資訊(解析後):")
zone_id_match = re.search(r'區ID:\s*([^,]*)', record.notes)
zone_name_match = re.search(r'區名稱:\s*([^,]*)', record.notes)
borrow_match = re.search(r'借採轉:\s*([^,]*)', record.notes)
stockout_days_match = re.search(r'缺貨天數:\s*([^,]*)', record.notes)
if zone_id_match:
print(f" 區ID: {zone_id_match.group(1).strip()}")
if zone_name_match:
print(f" 區名稱: {zone_name_match.group(1).strip()}")
if borrow_match:
print(f" 借採轉: {borrow_match.group(1).strip()}")
if stockout_days_match:
print(f" 缺貨天數: {stockout_days_match.group(1).strip()}")
# 檢查所有記錄的日期
print("\n" + "=" * 80)
print(f"檢查所有記錄的日期")
print("=" * 80)
date_counts = {}
for r in records:
date_str = str(r.import_date)
date_counts[date_str] = date_counts.get(date_str, 0) + 1
for date_str, count in sorted(date_counts.items()):
icon = "" if date_str == "2026-01-05" else "⚠️"
print(f" {icon} {date_str}: {count}")
print("\n" + "=" * 80)
print("✅ 驗證完成")
print("=" * 80)
if all(str(r.import_date) == "2026-01-05" for r in records):
print("\n🎉 所有資料的日期都正確!")
print("現在可以發送測試郵件了!")
else:
print("\n⚠️ 部分資料的日期不正確,請檢查")
except Exception as e:
print(f"\n❌ 錯誤: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
verify_import()

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
驗證新增欄位的資料
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from database.vendor_manager import VendorDatabaseManager
from database.vendor_models import VendorStockout
def verify_new_fields():
"""驗證新增欄位"""
db = VendorDatabaseManager()
session = db.get_session()
try:
# 查詢第一筆記錄
record = session.query(VendorStockout).first()
if not record:
print("❌ 沒有資料!")
return
print("=" * 80)
print("完整資料檢查")
print("=" * 80)
print(f"\n📅 日期欄位:")
print(f" 當前日期 (import_date): {record.import_date}")
print(f" 缺貨日期 (stockout_date): {record.stockout_date}")
print(f"\n👤 組織欄位:")
print(f" 處別 (department): {record.department}")
print(f" 科別 (section): {record.section}")
print(f" PM姓名 (pm_name): {record.pm_name}")
print(f" 區ID (zone_id): {record.zone_id}")
print(f" 區名稱 (zone_name): {record.zone_name}")
print(f"\n📦 商品欄位:")
print(f" 商品ID (product_code): {record.product_code}")
print(f" 商品名稱 (product_name): {record.product_name[:50]}...")
print(f" 單品/組合商品 (product_spec): {record.product_spec}")
print(f" 借採轉 (borrow_transfer): {record.borrow_transfer}")
print(f"\n🏢 廠商欄位:")
print(f" 來源供應商編號 (vendor_code): {record.vendor_code}")
print(f" 來源供應商名稱 (vendor_name): {record.vendor_name}")
print(f"\n📊 數量欄位:")
print(f" 商品可賣量 (current_stock): {record.current_stock}")
print(f" 缺貨天數 (stockout_days): {record.stockout_days}")
print(f" 缺貨商品前30天業績 (monthly_sales_amount): {record.monthly_sales_amount}")
print(f" 最近30天銷售量 (monthly_sales_qty): {record.monthly_sales_qty}")
print(f" 庫存水位 (safe_stock_days): {record.safe_stock_days}")
print(f"\n📝 其他:")
print(f" notes: {record.notes}")
# 檢查所有記錄
print("\n" + "=" * 80)
print("檢查所有記錄")
print("=" * 80)
all_records = session.query(VendorStockout).all()
for i, r in enumerate(all_records, 1):
print(f"\n記錄 {i}:")
print(f" 當前日期: {r.import_date}")
print(f" 區ID: {r.zone_id}")
print(f" 區名稱: {r.zone_name}")
print(f" 商品ID: {r.product_code}")
print(f" 借採轉: {r.borrow_transfer}")
print(f" 商品可賣量: {r.current_stock}")
print(f" 缺貨日期: {r.stockout_date}")
print(f" 缺貨天數: {r.stockout_days}")
print(f" 前30天業績: {r.monthly_sales_amount}")
print(f" 最近30天銷量: {r.monthly_sales_qty}")
print(f" 庫存水位: {r.safe_stock_days}")
print("\n" + "=" * 80)
except Exception as e:
print(f"\n❌ 錯誤: {e}")
import traceback
traceback.print_exc()
finally:
session.close()
if __name__ == '__main__':
verify_new_fields()