ストレージ内の全てのdbファイルをリストアップし、重複していない部分を追加して、もしくは差分を追加して統合する。
import tkinter as tk
from tkinter import messagebox, scrolledtext
import sqlite3
import os
import threading
class DBScannerMerger:
def __init__(self, root):
self.root = root
self.root.title("SQLite 全検索 & 高度な統合")
self.root.geometry("500x700")
self.db_files = []
# --- UI レイアウト ---
tk.Label(root, text="SQLite データベース統合ツール", font=("Arial", 16, "bold")).pack(pady=10)
# 検索ボタン
self.btn_scan = tk.Button(root, text="1. スマホ内を検索してリストアップ",
command=self.start_scan_thread, bg="#4CAF50", fg="white", font=("Arial", 10, "bold"), height=2)
self.btn_scan.pack(pady=10, padx=20, fill=tk.X)
self.status_label = tk.Label(root, text="待機中...", fg="blue")
self.status_label.pack()
# ファイルリスト表示(複数選択可能)
tk.Label(root, text="統合するファイルを複数選択(タップ)してください:", font=("Arial", 10)).pack(anchor="w", padx=20)
self.listbox = tk.Listbox(root, selectmode=tk.MULTIPLE, width=60, height=15, font=("Courier", 9))
self.listbox.pack(pady=5, padx=20, fill=tk.BOTH, expand=True)
# 実行ボタン
self.btn_merge = tk.Button(root, text="2. 選択したDBを統合実行",
command=self.run_merge, bg="#2196F3", fg="white", font=("Arial", 12, "bold"), height=2, state=tk.DISABLED)
self.btn_merge.pack(pady=10, padx=20, fill=tk.X)
# ログ表示エリア
self.log_area = scrolledtext.ScrolledText(root, height=8, font=("Courier", 8), bg="#f0f0f0")
self.log_area.pack(pady=10, padx=20, fill=tk.X)
tk.Label(root, text="※ リストの最初(一番上)にある選択ファイルが『統合先』になります",
font=("Arial", 9), fg="red").pack(pady=5)
def log(self, message):
"""ログエリアにメッセージを表示"""
self.log_area.insert(tk.END, message + "\n")
self.log_area.see(tk.END)
def start_scan_thread(self):
"""スキャンを別スレッドで開始"""
self.btn_scan.config(state=tk.DISABLED)
self.status_label.config(text="スキャン中... しばらくお待ちください")
self.listbox.delete(0, tk.END)
self.log_area.delete(1.0, tk.END)
self.log("スキャン開始...")
threading.Thread(target=self.scan_db_files, daemon=True).start()
def scan_db_files(self):
# 検索対象:Androidのユーザーストレージ
search_path = "/storage/emulated/0"
found_files = []
try:
for root_dir, dirs, files in os.walk(search_path):
# 不要なディレクトリを除外して高速化
if any(x in root_dir.lower() for x in ["android/data", "cache", ".thumbnails", "temp"]):
continue
for file in files:
if file.endswith(".db"):
full_path = os.path.join(root_dir, file)
found_files.append(full_path)
except Exception as e:
self.log(f"スキャンエラー: {e}")
self.db_files = sorted(found_files) # 名前順にソート
self.root.after(0, self.update_listbox)
def update_listbox(self):
if not self.db_files:
self.status_label.config(text="DBファイルが見つかりませんでした")
self.log("DBファイルが見つかりませんでした。権限を確認してください。")
else:
for path in self.db_files:
self.listbox.insert(tk.END, f" {os.path.basename(path)} | {path}")
self.status_label.config(text=f"{len(self.db_files)} 個のDBを発見しました")
self.btn_merge.config(state=tk.NORMAL)
self.log("スキャン完了。")
self.btn_scan.config(state=tk.NORMAL)
def run_merge(self):
selected_indices = self.listbox.curselection()
if len(selected_indices) < 2:
messagebox.showwarning("警告", "統合には2つ以上のファイルを選択してください")
return
# 選択されたファイルのパスを取得
# インデックス順になるため、リストの上にあるものがbaseになる
base_path = self.db_files[selected_indices[0]]
target_paths = [self.db_files[i] for i in selected_indices[1:]]
self.log(f"--- 統合開始 ---")
self.log(f"統合先: {os.path.basename(base_path)}")
success_count = 0
for target in target_paths:
self.log(f"読込中: {os.path.basename(target)}...")
if self.execute_sql_merge(base_path, target):
success_count += 1
self.log(f" => 成功")
else:
self.log(f" => 失敗(ログを確認してください)")
self.log(f"--- すべての処理が終了しました ---")
messagebox.showinfo("完了", f"統合先: {os.path.basename(base_path)}\n成功したファイル数: {success_count}")
def execute_sql_merge(self, base_path, target_path):
conn = sqlite3.connect(base_path)
cursor = conn.cursor()
try:
# 1. 外部DBをアタッチ
cursor.execute(f"ATTACH DATABASE '{target_path}' AS target")
# 2. 統合元(target)のテーブル一覧と作成SQLを取得
cursor.execute("SELECT name, sql FROM target.sqlite_master WHERE type='table'")
target_tables = cursor.fetchall()
for table_name, create_sql in target_tables:
if table_name.startswith('sqlite_'): continue
# 3. 統合先にテーブルが存在するかチェック
cursor.execute(f"SELECT count(*) FROM main.sqlite_master WHERE type='table' AND name='{table_name}'")
if cursor.fetchone()[0] == 0:
self.log(f" テーブル '{table_name}' を作成します...")
cursor.execute(create_sql)
# 4. データの統合(重複除外)
# EXCEPT句を使用して、既存データと全く同じ行は追加しない
try:
merge_sql = f"""
INSERT INTO main.{table_name}
SELECT * FROM target.{table_name}
EXCEPT
SELECT * FROM main.{table_name}
"""
cursor.execute(merge_sql)
changes = conn.total_changes
self.log(f" テーブル '{table_name}': データを追加しました")
except Exception as e:
self.log(f" エラー (テーブル {table_name}): {e}")
conn.commit()
cursor.execute("DETACH DATABASE target")
return True
except Exception as e:
self.log(f"致命的なエラー: {e}")
return False
finally:
conn.close()
if __name__ == "__main__":
root = tk.Tk()
app = DBScannerMerger(root)
root.mainloop()
コメント
コメントを投稿