コンプレッサー故障診断システム

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

コンプレッサー簡易OBD診断ツール(プロトタイプ)

1ファイル完結版 / Tkinter GUI

- MockCompressorDevice: 実機の代わりに疑似センサ値を生成

- DiagnosticEngine: センサ値から故障コードを判定

- CompressorObdApp: Tkinter GUI アプリ本体

このままポートフォリオとして提示できる構成を意識しています。

"""

import random

from dataclasses import dataclass

from typing import List

import tkinter as tk

from tkinter import ttk

# ==============================

# データモデル

# ==============================

@dataclass

class CompressorState:

    """コンプレッサーの現在状態を表すデータクラス"""

    discharge_pressure_bar: float   # 吐出圧力

    discharge_temp_c: float         # 吐出温度

    motor_current_a: float          # モータ電流

    ambient_temp_c: float           # 周囲温度

    run_hours: float                # 運転時間

    start_count: int                # 起動回数

    is_running: bool                # 運転中フラグ

@dataclass

class FaultCode:

    """故障コード情報を表すデータクラス"""

    code: str

    severity: str

    message: str

    probable_causes: List[str]

    recommended_actions: List[str]

# ==============================

# モックデバイス(疑似コンプレッサー)

# ==============================

class MockCompressorDevice:

    """

    実機の代わりにコンプレッサーの状態をランダム生成するモック。

    将来ここを Modbus/TCP・シリアル・CAN 等の通信クラスに差し替えれば、

    実機と接続して動くように拡張可能。

    """

    def __init__(self):

        self.run_hours = 1234.5

        self.start_count = 842

    def read_state(self) -> CompressorState:

        """疑似的にコンプレッサーの状態を生成して返す"""

        # 運転中か停止中か(8割くらい運転中)

        is_running = random.random() > 0.2

        discharge_pressure = random.uniform(6.5, 7.5) if is_running else 0.0

        # 吐出温度:通常は 75〜85℃、たまに過熱イベントを発生させる

        base_temp = random.uniform(75, 85)

        if random.random() < 0.1:

            base_temp += random.uniform(10, 30)  # 過熱イベント

        motor_current = random.uniform(40, 60) if is_running else 0.0

        ambient_temp = random.uniform(15, 35)

        # 時間と起動回数を少しずつ増やす

        if is_running:

            self.run_hours += 0.01

        if random.random() < 0.01:

            self.start_count += 1

        return CompressorState(

            discharge_pressure_bar=discharge_pressure,

            discharge_temp_c=base_temp,

            motor_current_a=motor_current,

            ambient_temp_c=ambient_temp,

            run_hours=self.run_hours,

            start_count=self.start_count,

            is_running=is_running,

        )

# ==============================

# 診断エンジン

# ==============================

class DiagnosticEngine:

    """

    センサ値から故障コードを判定するルールベース診断エンジン。

    閾値やルールは将来 JSON/YAML 化して外部ファイルで管理可能。

    """

    def __init__(self):

        self.max_discharge_temp_c = 95.0   # 吐出温度上限

        self.max_motor_current_a = 65.0    # モータ電流上限

        self.min_discharge_pressure_bar = 6.0   # 吐出圧力下限

        self.oil_change_hours = 4000.0     # オイル交換推奨時間

    def diagnose(self, state: CompressorState) -> List[FaultCode]:

        """現在状態から故障コード一覧を生成する"""

        faults: List[FaultCode] = []

        # 1) 吐出温度過昇

        if state.is_running and state.discharge_temp_c > self.max_discharge_temp_c:

            faults.append(FaultCode(

                code="C0101",

                severity="HIGH",

                message="吐出温度が上限を超過しています(過熱)。",

                probable_causes=[

                    "冷却ファン故障または回転不足",

                    "冷却器フィン目詰まり(埃・油汚れ)",

                    "周囲温度が高すぎる",

                    "冷却水不足・冷却水温高すぎ"

                ],

                recommended_actions=[

                    "冷却ファンの動作・回転方向を確認",

                    "冷却器の清掃(エアブロー・洗浄)",

                    "設置環境の換気状況・周囲温度を確認",

                    "水冷タイプなら冷却水量・温度を確認"

                ]

            ))

        # 2) 吐出圧力不足

        if state.is_running and state.discharge_pressure_bar < self.min_discharge_pressure_bar:

            faults.append(FaultCode(

                code="C0201",

                severity="MEDIUM",

                message="吐出圧力が目標値を下回っています。",

                probable_causes=[

                    "吸込側フィルタ目詰まり",

                    "エア漏れ(配管・継手・バルブ)",

                    "圧力設定値の不適切な変更",

                    "アンロード弁の不良"

                ],

                recommended_actions=[

                    "吸込フィルタの目詰まりを点検・清掃",

                    "下流配管のエア漏れチェック(石鹸水など)",

                    "圧力設定値・制御モードを確認",

                    "アンロード弁の動作確認"

                ]

            ))

        # 3) モータ過負荷傾向

        if state.is_running and state.motor_current_a > self.max_motor_current_a:

            faults.append(FaultCode(

                code="C0301",

                severity="HIGH",

                message="モータ電流が許容値を超えています。",

                probable_causes=[

                    "電源電圧の不平衡・低下",

                    "負荷側の固着・過負荷",

                    "ベルト張りすぎ",

                    "モータ内部の劣化"

                ],

                recommended_actions=[

                    "三相電源電圧(不平衡・低電圧)を測定",

                    "圧縮機本体の回転の重さ・異音を確認",

                    "ベルト駆動の場合は張力を点検・調整",

                    "モータの絶縁抵抗・温度記録を確認"

                ]

            ))

        # 4) オイル交換時期超過(簡易判定)

        if state.run_hours > self.oil_change_hours:

            faults.append(FaultCode(

                code="C0401",

                severity="LOW",

                message="オイル交換推奨時間を超過しています。",

                probable_causes=[

                    "オイル交換時期の未実施"

                ],

                recommended_actions=[

                    "運転時間を確認し、オイル交換を実施",

                    "オイルフィルタの同時交換を検討"

                ]

            ))

        return faults

# ==============================

# GUI アプリケーション

# ==============================

class CompressorObdApp:

    """

    コンプレッサー OBD 風 診断GUIアプリ

    - 上部: ライブ値表示

    - 中央: 故障コード一覧 (Treeview)

    - 下部: 選択中のコードの詳細(原因候補・推奨対処)

    """

    UPDATE_INTERVAL_MS = 2000  # データ更新周期 (ミリ秒)

    def __init__(self, master: tk.Tk):

        self.master = master

        self.master.title("コンプレッサー OBD 診断ツール(プロトタイプ)")

        self.master.geometry("900x600")

        # モデル

        self.device = MockCompressorDevice()

        self.engine = DiagnosticEngine()

        self.current_faults: List[FaultCode] = []

        # UI 構築

        self._build_ui()

        # 周期更新開始

        self.update_data()

    def _build_ui(self):

        # ========= 上部:ライブ値パネル =========

        top_frame = ttk.LabelFrame(self.master, text="ライブ値", padding=10)

        top_frame.pack(side=tk.TOP, fill=tk.X, padx=10, pady=5)

        self.var_running = tk.StringVar()

        self.var_pressure = tk.StringVar()

        self.var_temp = tk.StringVar()

        self.var_motor_current = tk.StringVar()

        self.var_ambient = tk.StringVar()

        self.var_run_hours = tk.StringVar()

        self.var_start_count = tk.StringVar()

        # 2列レイアウトでラベル配置

        row = 0

        ttk.Label(top_frame, text="運転状態:").grid(row=row, column=0, sticky=tk.W, padx=5, pady=2)

        ttk.Label(top_frame, textvariable=self.var_running).grid(row=row, column=1, sticky=tk.W, padx=5, pady=2)

        row += 1

        ttk.Label(top_frame, text="吐出圧力 [bar]:").grid(row=row, column=0, sticky=tk.W, padx=5, pady=2)

        ttk.Label(top_frame, textvariable=self.var_pressure).grid(row=row, column=1, sticky=tk.W, padx=5, pady=2)

        row += 1

        ttk.Label(top_frame, text="吐出温度 [℃]:").grid(row=row, column=0, sticky=tk.W, padx=5, pady=2)

        ttk.Label(top_frame, textvariable=self.var_temp).grid(row=row, column=1, sticky=tk.W, padx=5, pady=2)

        row += 1

        ttk.Label(top_frame, text="モータ電流 [A]:").grid(row=row, column=0, sticky=tk.W, padx=5, pady=2)

        ttk.Label(top_frame, textvariable=self.var_motor_current).grid(row=row, column=1, sticky=tk.W, padx=5, pady=2)

        row += 1

        ttk.Label(top_frame, text="周囲温度 [℃]:").grid(row=row, column=0, sticky=tk.W, padx=5, pady=2)

        ttk.Label(top_frame, textvariable=self.var_ambient).grid(row=row, column=1, sticky=tk.W, padx=5, pady=2)

        row += 1

        ttk.Label(top_frame, text="運転時間 [h]:").grid(row=row, column=0, sticky=tk.W, padx=5, pady=2)

        ttk.Label(top_frame, textvariable=self.var_run_hours).grid(row=row, column=1, sticky=tk.W, padx=5, pady=2)

        row += 1

        ttk.Label(top_frame, text="起動回数:").grid(row=row, column=0, sticky=tk.W, padx=5, pady=2)

        ttk.Label(top_frame, textvariable=self.var_start_count).grid(row=row, column=1, sticky=tk.W, padx=5, pady=2)

        # ========= 中央:故障コード一覧 =========

        mid_frame = ttk.LabelFrame(self.master, text="診断結果(故障コード)", padding=10)

        mid_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=10, pady=5)

        columns = ("code", "severity", "message")

        self.tree = ttk.Treeview(mid_frame, columns=columns, show="headings", height=8)

        self.tree.heading("code", text="コード")

        self.tree.heading("severity", text="レベル")

        self.tree.heading("message", text="内容")

        self.tree.column("code", width=80, anchor=tk.CENTER)

        self.tree.column("severity", width=80, anchor=tk.CENTER)

        self.tree.column("message", width=500, anchor=tk.W)

        self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        # スクロールバー

        scrollbar = ttk.Scrollbar(mid_frame, orient=tk.VERTICAL, command=self.tree.yview)

        self.tree.configure(yscroll=scrollbar.set)

        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

        # 選択イベント

        self.tree.bind("<>", self.on_tree_select)

        # ========= 下部:選択中コード詳細 =========

        bottom_frame = ttk.LabelFrame(self.master, text="選択中の故障コード詳細", padding=10)

        bottom_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=True, padx=10, pady=5)

        self.detail_text = tk.Text(bottom_frame, height=8, wrap=tk.WORD)

        self.detail_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        detail_scrollbar = ttk.Scrollbar(bottom_frame, orient=tk.VERTICAL, command=self.detail_text.yview)

        self.detail_text.configure(yscrollcommand=detail_scrollbar.set)

        detail_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

    # ==========================

    # データ更新 & 表示更新

    # ==========================

    def update_data(self):

        """デバイスから状態を読み取り、診断を実行してGUIに反映する"""

        state = self.device.read_state()

        faults = self.engine.diagnose(state)

        self.current_faults = faults

        # ライブ値更新

        self.var_running.set("運転中" if state.is_running else "停止中")

        self.var_pressure.set(f"{state.discharge_pressure_bar:.2f}")

        self.var_temp.set(f"{state.discharge_temp_c:.1f}")

        self.var_motor_current.set(f"{state.motor_current_a:.1f}")

        self.var_ambient.set(f"{state.ambient_temp_c:.1f}")

        self.var_run_hours.set(f"{state.run_hours:.1f}")

        self.var_start_count.set(str(state.start_count))

        # 故障コード一覧更新

        for item in self.tree.get_children():

            self.tree.delete(item)

        if faults:

            for idx, f in enumerate(faults):

                self.tree.insert("", "end", iid=str(idx),

                                 values=(f.code, f.severity, f.message))

        else:

            # 異常なしの場合、メッセージだけ表示

            self.tree.insert("", "end", iid="no_fault",

                             values=("-", "-", "異常は検出されていません。"))

        # 詳細欄クリア(次の手で選び直してもらう)

        self.detail_text.delete("1.0", tk.END)

        if not faults:

            self.detail_text.insert(tk.END, "現在検出されている故障コードはありません。")

        # 次回更新を予約

        self.master.after(self.UPDATE_INTERVAL_MS, self.update_data)

    # ==========================

    # イベントハンドラ

    # ==========================

    def on_tree_select(self, event):

        """故障コード一覧で選択されたときに詳細情報を表示する"""

        selection = self.tree.selection()

        if not selection:

            return

        item_id = selection[0]

        if item_id == "no_fault":

            return

        try:

            idx = int(item_id)

        except ValueError:

            return

        if idx < 0 or idx >= len(self.current_faults):

            return

        fault = self.current_faults[idx]

        self.detail_text.delete("1.0", tk.END)

        self.detail_text.insert(tk.END, f"[{fault.severity}] {fault.code} : {fault.message}\n\n")

        self.detail_text.insert(tk.END, "■ 想定される原因候補:\n")

        for c in fault.probable_causes:

            self.detail_text.insert(tk.END, f"  - {c}\n")

        self.detail_text.insert(tk.END, "\n■ 推奨される点検・対処:\n")

        for a in fault.recommended_actions:

            self.detail_text.insert(tk.END, f"  - {a}\n")

# ==============================

# エントリーポイント

# ==============================

def main():

    root = tk.Tk()

    # ttk のテーマを少しマシにするための設定(環境に応じて変更される)

    try:

        style = ttk.Style()

        if "clam" in style.theme_names():

            style.theme_use("clam")

    except Exception:

        pass

    app = CompressorObdApp(root)

    root.mainloop()

if __name__ == "__main__":

    main()

コメント

このブログの人気の投稿

ミライアイ内服薬は薬事法違反で、ほとんど効果がない詐欺ですか?

最高裁での上告理由書受理・却下の判断基準について

裁判官の忌避申立書の作成例