
こんにちは。AI coordinatorの清水です。今日は雨なので、今までやろうやろうと思っていたラズパイでYOLO11を動かすを試してみたいと思います。しかもスマホでも見れるようにしたいと思います。
https://github.com/ai-coordinator/RaspberryPi_yolo11
いまさらなんだが、ラズパイでYOLOを試したことがなかったので、どのくらい高速に動くのか検証してみました。現時点でYOLO12まであるようですが、まだメインはYOLO11のようなので、こちらをベースに起動してみたいと思います。
ラズパイのセットアップから
まあ、ここのやり方は色々なサイトに紹介があるのでので、適当に参考して頂ければと思います。最近のラズパイは以前と比べて価格が高いので気軽に試せなくなっているのは技術普及の阻害になっていると思っています。私も以前ほど気軽に購入できなくなりました。
環境は
・ラズパイ5本体
・OSはラズパイOSを使用。今回はubuntuではないです。
インストールしてラズパイを起動できたら、「Raspberry Piの設定」をすべてオンにしましょう。

(写真が雑ですみません。)
SSHやVNCでリモート接続するためです。これはラズパイ本体のキーボードでコマンドを打ち込んだりコードを修正するのがめんどくさいので、windowsPCやmacからコマンドやコード修正できるようするために必要です。
YOLO11のセットアップ
YOLOは”Ultralytics”が滅茶苦茶使いやすく提供してくださっているので楽勝に使えるようになりました。以前のYOLOはdarknetと言ってセットアップが結構大変でしたが、Ultralyticsの登場で一気に物体検出が簡単になりましたね。
今回はこちらのクイックスタートに則ってYOLO11をセットアップしたいと思います。
https://docs.ultralytics.com/ja/guides/raspberry-pi
dokerなしで進めます。
SSHでラズパイに接続しましょう
私はwindowsからラズパイに接続してセットアップしますが、ラズパイ本体からキーボード接続して実施しても同じ結果になります。
とはいえ最初はラズパイの”IP adress”を知る必要があるので、ifconfigコマンドでIP adressを把握しましょー。
把握したらwindowsのターミナルからssh接続します。
コマンドは
ssh ラズパイ名@ipアドレス
ですね。
よくipアドレスを取得している画面ハードコピーを紹介してるサイトがたくさんありますが、このipアドレスにぼかしを付けている人がたくさんいます。
別に意味ないと思うので、私は普通に掲載します。誰もハックしねーだろって毎回思います。笑

これでラズパイに接続できました。
YOLO11を起動してみましょー
Ultralyticsのクイックスタートガイド通り進めます。
sudo apt update
sudo apt install python3-pip -y
pip install -U pip
三行の目のpip処理で以下のエラーが発生しました。

If you believe this is a mistake, please contact your Python installation or OS distribution provider. You can override this, at the risk of breaking your Python installation or OS, by passing –break-system-packages.
hint: See PEP 668 for the detailed specification.
慣れてない方は大体こういうところで引っ掛かりますね。以前は直接グローバル環境にpipでインストール出来ましたが、最近のOSは仮想環境を作らないと環境構築出来ない仕様になっているため、このようなエラーが発生します。
これを解決するには仮想環境を作る必要がありますね。この仮想環境で構築したコードやなんかは、毎回コードを起動するたびにこの仮想環境を起動する必要があるのでめんどくさいです。でもまあしょうがないですね。
とういことで以下の順で仮想環境を作りましょー。
python3 -m venv ~/myenv
source ~/myenv/bin/activate
pip install -U pip

画像のようになればOKです。
ではクイックスタートに戻って、
pip install ultralytics[export]
暫く待ちましょー。
インストールできたら、リブートしましょー。
sudo reboot
再起動したら、いよいよコードを作成するわけですが、ターミナルからのsshだとコード作成が大変なので、ここからはVS codeから接続します。VS codeに”Remote Development”をインストールすれば、先ほどのSSH接続の要領でラズパイに接続してVS codeが使用できるようになります。
VS codeで先ほど作成した仮想環境myenvディレクトリに接続して、環境を立ち上げましょー
source ~/myenv/bin/activate

このようにコマンド画面の先頭に(myenv)と表示されれば無事に仮想環境が起動しています。
では以下のコードを作成してプログラムを動かしてみます。ファイル目はtest.pyとして
from ultralytics import YOLO
# Load a YOLO11n PyTorch model
model = YOLO("yolo11n.pt")
# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolo11n_ncnn_model'
# Load the exported NCNN model
ncnn_model = YOLO("yolo11n_ncnn_model")
# Run inference
results = ncnn_model("https://ultralytics.com/images/bus.jpg")
python test.pyで起動しましょー

以下のような結果が出ればとりあえず無事動いていることになります。

カメラからリアルタイム物体検出を動かしてみよー
ここからが本題です。リアルタイム物体検出をラズパイでやってみます。
カメラを使えるようにするためにopencvをインストールします。
sudo apt install python3-opencv
pip install opencv-python-headless flask
opencv-python-headless flaskをインストールする理由は、windowsPCでラズパイ上で動くYOLOの結果を見たいからです。
import os
import cv2
import threading
import time
from flask import Flask, Response, render_template_string
from ultralytics import YOLO
# ===== 設定 =====
USE_NCNN = True # 軽量化したい場合 True(初回のみエクスポートに時間がかかる)
MODEL_PT = "yolo11n.pt"
MODEL_NCNN_DIR = "yolo11n_ncnn_model"
CAM_INDEX = 0 # /dev/video0 → 0
IMG_SIZE = 640
CONF = 0.25
FPS_TARGET = 30
app = Flask(__name__)
cap = None
model = None
HTML = """
<!doctype html>
<title>YOLO11 Realtime</title>
<style>body{margin:0;background:#111;color:#eee;font-family:sans-serif} .wrap{max-width:980px;margin:0 auto;padding:16px} img{width:100%;height:auto;display:block;border-radius:8px}</style>
<div class="wrap">
<h2>YOLO11 USB Camera (Realtime)</h2>
<p>このページはRaspberry Pi上のFlaskが配信しています。視聴を止めるにはページを閉じるだけでOKです。</p>
<img src="/video_feed" />
</div>
"""
def get_model():
if USE_NCNN:
if not os.path.isdir(MODEL_NCNN_DIR):
print("[INFO] NCNNモデルがありません。エクスポートします...")
base_model = YOLO(MODEL_PT)
base_model.export(format="ncnn") # yolo11n_ncnn_model/ が作成される
print("[INFO] NCNNエクスポート完了")
return YOLO(MODEL_NCNN_DIR)
else:
return YOLO(MODEL_PT)
def open_camera():
global cap
cap = cv2.VideoCapture(CAM_INDEX, cv2.CAP_V4L2)
# カメラが対応していれば解像度/フレームレートを設定
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
cap.set(cv2.CAP_PROP_FPS, FPS_TARGET)
if not cap.isOpened():
raise RuntimeError("[ERROR] カメラをオープンできませんでした。CAM_INDEX を確認してください。")
def gen_frames():
"""MJPEGストリームを生成"""
frame_interval = 1.0 / max(1, FPS_TARGET)
while True:
ok, frame = cap.read()
if not ok:
print("[WARN] フレーム取得失敗。再試行します。")
time.sleep(0.01)
continue
# 推論
results = model.predict(
source=frame,
imgsz=IMG_SIZE,
conf=CONF,
verbose=False
)
annotated = results[0].plot()
# JPEGにエンコードしてmultipartで送る
ok, buffer = cv2.imencode(".jpg", annotated, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if not ok:
continue
jpg = buffer.tobytes()
yield (b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + jpg + b"\r\n")
# 軽くフレーム間隔を調整
time.sleep(frame_interval)
@app.route("/")
def index():
return render_template_string(HTML)
@app.route("/video_feed")
def video_feed():
return Response(gen_frames(),
mimetype="multipart/x-mixed-replace; boundary=frame")
def main():
global model
print("[INFO] モデル読み込み中...")
model = get_model()
print("[INFO] カメラを起動します...")
open_camera()
print("[INFO] 起動しました。ブラウザでこの端末のIP:5000 にアクセスしてください。例: http://raspberrypi.local:5000")
# Flask をスレッドで起動
server = threading.Thread(target=lambda: app.run(host="0.0.0.0", port=5000, threaded=True, use_reloader=False))
server.daemon = True
server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
if cap:
cap.release()
print("\n[INFO] 停止しました。")
if __name__ == "__main__":
main()
コードを作成したら、プログラムを起動してみましょー。ブラウザから映像が見れると思います。素晴らしい。
$ python test.py
ブラウザからhttp://127.0.0.1:5000/にアクセスして映像を確認してい見ましょー。スマホからでも見れます。

一回試してみたかったので今回出来て良かったです。
ホスト名を決める
IPアドレスが変わると、いちいちラズパイでifconfigでIPを確認しなければいけなくなるため、ホスト名をつけましょー。
ai@raspberrypi:~ $ sudo systemctl enable --now avahi-daemon
ai@raspberrypi:~ $ sudo hostnamectl set-hostname aicamera01
ai@raspberrypi:~ $
これでホスト名が、”aicamera01″になります。以下のURLで起動できますね。
http://aicamera01.local:5000/
是非参考にしてみてください。
youtubeで紹介しているUIのコードはこちら
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Raspberry Pi 5 + Flask + Ultralytics YOLO11 (NCNN固定, yolo11n-seg)
- 右: 映像(SF風ネオンHUD:bbox+コーナー+ラベル、セグ輪郭)
- 左: サイドバー(FPS/温度/CPU/周波数/メモリ/ディスク/稼働時間/throttled + 閾値スライダー)
- 走査線・中央レティクルなし
- Pi向け最適化: OMP/OpenCVスレッド制御, MJPG入力, 短バッファ, 軽いウォームアップ
"""
# ===== スレッド数を制限(NumPy/OMPより前に設定) =====
import os
os.environ.setdefault("OMP_NUM_THREADS", "4")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
import cv2
try:
cv2.setNumThreads(1) # OpenCV側は1スレに
except Exception:
pass
if hasattr(cv2, "useOptimized"):
try:
cv2.useOptimized(True)
except Exception:
pass
import time
import json
import shutil
import threading
from collections import deque
from subprocess import check_output, CalledProcessError
from flask import Flask, Response, render_template_string, jsonify, request
from ultralytics import YOLO
import numpy as np
# ===== 固定設定 =====
MODEL_PT = "yolo11n-seg.pt" # このモデルだけを使う(NCNNエクスポート&読み込み)
IMG_SIZE = 640
CONF = 0.25 # ← 初期値。左サイドバーのスライダーで変更可(0.00〜1.00)
FPS_TARGET = 30
JPEG_QUALITY = 80
CAM_INDEX = 0
# ===== Flask / 状態 =====
app = Flask(__name__)
cap = None
model = None
model_lock = threading.Lock()
# ライブ統計
latest_fps = 0.0
_latest_fps_lock = threading.Lock()
# conf のロック
conf_lock = threading.Lock()
# ===== HTML(左サイドバー + 右映像 + 閾値スライダー) =====
HTML = """
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>AI coordinator Raspberry Pi vision</title>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<style>
:root{color-scheme:dark; --bg:#0b0b0f; --panel:#111827; --muted:#93a4bf; --accent:#38bdf8; --ok:#22c55e; --warn:#f59e0b; --bad:#ef4444}
*{box-sizing:border-box}
html,body{height:100%}
body{margin:0;background:var(--bg);color:#e8f0ff;font-family:system-ui,Segoe UI,Roboto,Helvetica,Arial,sans-serif}
.layout{display:flex;min-height:100vh}
.sidebar{
width:360px;padding:16px;position:sticky;top:0;align-self:flex-start;
background:linear-gradient(180deg, rgba(24,35,55,.75), rgba(11,11,15,.6));
border-right:1px solid #1f2a44;backdrop-filter:blur(8px)
}
.brand{display:flex;align-items:center;gap:10px;margin-bottom:12px}
.brand .dot{width:10px;height:10px;border-radius:9999px;background:var(--accent);box-shadow:0 0 14px var(--accent)}
h1{font-size:18px;margin:0}
.muted{color:var(--muted);font-size:12px}
.section{margin-top:14px}
.card{
background:var(--panel);border:1px solid #22304d;border-radius:14px;padding:12px;margin-bottom:12px;
box-shadow:0 8px 30px rgba(0,0,0,.35)
}
.row{display:flex;align-items:center;justify-content:space-between;margin:6px 0}
.k{font-size:12px;color:var(--muted)}
.v{font-variant-numeric:tabular-nums}
.bar{height:8px;background:#0f172a;border-radius:9999px;overflow:hidden;border:1px solid #1f2a44}
.bar > span{display:block;height:100%;background:var(--accent);width:0%}
.pill{
display:inline-block;background:#172033;border:1px solid #2b3c66;padding:4px 8px;border-radius:9999px;margin-right:.5rem;font-size:12px
}
.content{flex:1;padding:16px}
.videoWrap{
width:100%;min-height:65vh;display:flex;align-items:center;justify-content:center;
background:radial-gradient(1200px 600px at 70% 30%, rgba(56,189,248,.15), transparent);
border-radius:16px;border:1px solid #22304d;box-shadow:0 8px 30px rgba(0,0,0,.35)
}
img.stream{width:100%;height:auto;display:block;border-radius:12px}
input[type=range]{
-webkit-appearance:none;appearance:none;width:100%;height:8px;border-radius:9999px;background:#0f172a;border:1px solid #22304d;outline:none
}
input[type=range]::-webkit-slider-thumb{
-webkit-appearance:none;appearance:none;width:18px;height:18px;border-radius:50%;
background:var(--accent);box-shadow:0 0 10px var(--accent);cursor:pointer;border:none;margin-top:-5px
}
.rangeHead{display:flex;justify-content:space-between;align-items:center;margin-bottom:6px}
.rangeVal{font-variant-numeric:tabular-nums}
</style>
</head>
<body>
<div class="layout">
<aside class="sidebar">
<div class="brand">
<span class="dot"></span>
<div>
<h1>AI coordinator Raspberry Pi vision</h1>
<div class="muted">Raspberry Pi 5 / NCNN</div>
</div>
</div>
<!-- Stats -->
<div class="section card">
<div class="row"><div class="k">FPS</div><div class="v" id="fps">—</div></div>
<div class="bar"><span id="fpsbar"></span></div>
<div class="row"><div class="k">Backend</div><div class="v">NCNN</div></div>
<div class="row"><div class="k">Model</div><div class="v">yolo11n-seg</div></div>
<div class="row"><div class="k">Input</div><div class="v">{{img}}px, Q{{jpegq}}, {{fps}} FPS tgt</div></div>
</div>
<!-- Detection Settings -->
<div class="section card">
<div class="rangeHead">
<div class="k">Confidence threshold</div>
<div class="v rangeVal"><span id="confv">{{conf}}</span></div>
</div>
<input id="conf" type="range" min="0" max="1" step="0.01" value="{{conf}}"/>
<div class="row"><div class="k">Lower → 検出増 / False Positive増</div><div class="k">Higher → 厳しめ</div></div>
</div>
<!-- Hardware Health -->
<div class="section card">
<div style="margin-bottom:8px" class="k">Hardware Health</div>
<div class="row"><div class="k">Temp</div><div class="v" id="temp">—</div></div>
<div class="bar"><span id="tempbar"></span></div>
<div class="row"><div class="k">CPU %</div><div class="v" id="cpu">—</div></div>
<div class="bar"><span id="cpubar"></span></div>
<div class="row"><div class="k">CPU Freq</div><div class="v" id="freq">—</div></div>
<div class="row"><div class="k">Load (1/5/15)</div><div class="v" id="load">—</div></div>
<div class="row"><div class="k">Mem</div><div class="v" id="mem">—</div></div>
<div class="bar"><span id="membar"></span></div>
<div class="row"><div class="k">Disk</div><div class="v" id="disk">—</div></div>
<div class="bar"><span id="diskbar"></span></div>
<div class="row"><div class="k">Uptime</div><div class="v" id="uptime">—</div></div>
<div class="row"><div class="k">Throttled</div><div class="v" id="throt">—</div></div>
</div>
<div class="section card">
<span class="pill">IMG:{{img}}</span>
<span class="pill">NCNN</span>
<span class="pill">Flask</span>
</div>
</aside>
<main class="content">
<div class="videoWrap">
<img class="stream" src="/video_feed" alt="YOLO stream"/>
</div>
</main>
</div>
<script>
function pct(x){return Math.max(0, Math.min(100, x))}
function setBar(id,val){document.getElementById(id).style.width = pct(val) + "%"}
function fmtBytes(n){return (n/1048576).toFixed(0) + " MiB"}
function pad(n){return n.toString().padStart(2,"0")}
function fmtUptime(s){const d=Math.floor(s/86400),h=Math.floor((s%86400)/3600),m=Math.floor((s%3600)/60);return (d?d+"d ":"")+pad(h)+":"+pad(m)}
// 閾値スライダー → /set_conf にPOST(デバウンス)
const conf = document.getElementById("conf")
const confv = document.getElementById("confv")
let confTimer = null
conf.addEventListener("input", ()=>{
confv.textContent = Number(conf.value).toFixed(2)
if(confTimer) clearTimeout(confTimer)
confTimer = setTimeout(async ()=>{
try{
await fetch("/set_conf", {
method:"POST",
headers:{"Content-Type":"application/json"},
body: JSON.stringify({conf: Number(conf.value)})
})
}catch(e){ console.error(e) }
}, 120)
})
async function tick(){
try{
const r = await fetch("/health"); const j = await r.json()
// FPS
document.getElementById("fps").textContent = j.fps.toFixed(1)
setBar("fpsbar", (j.fps / {{fps}}) * 100)
// Temp
document.getElementById("temp").textContent = j.temp_c ? (j.temp_c.toFixed(1) + " ℃") : "—"
if(j.temp_c!=null) setBar("tempbar", (j.temp_c/85)*100)
// CPU
document.getElementById("cpu").textContent = (j.cpu_percent!=null ? j.cpu_percent.toFixed(0)+" %" : "—")
if(j.cpu_percent!=null) setBar("cpubar", j.cpu_percent)
// Freq
document.getElementById("freq").textContent = j.cpu_mhz ? (j.cpu_mhz.toFixed(0)+" MHz") : "—"
// Load
document.getElementById("load").textContent = j.load ? j.load.map(x=>x.toFixed(2)).join(" / ") : "—"
// Mem
const memUsed = j.mem_total - j.mem_avail
document.getElementById("mem").textContent = fmtBytes(memUsed) + " / " + fmtBytes(j.mem_total)
if(j.mem_total>0) setBar("membar", (memUsed/j.mem_total)*100)
// Disk
document.getElementById("disk").textContent = fmtBytes(j.disk_used) + " / " + fmtBytes(j.disk_total)
if(j.disk_total>0) setBar("diskbar", (j.disk_used/j.disk_total)*100)
// Uptime
document.getElementById("uptime").textContent = fmtUptime(j.uptime)
// Throttled
document.getElementById("throt").textContent = j.throttled || "unknown"
// サーバ側confをUIに同期(他クライアントとズレないように)
if(typeof j.conf === "number"){
const sv = j.conf.toFixed(2)
if(Number(conf.value).toFixed(2) !== sv){
conf.value = sv
confv.textContent = sv
}
}
}catch(e){
console.error(e)
}finally{
setTimeout(tick, 1000)
}
}
tick()
</script>
</body>
</html>
"""
# ===== ユーティリティ =====
def ncnn_dir_for(pt_name: str) -> str:
return pt_name.replace(".pt", "_ncnn_model")
def load_ncnn_model(pt_name: str):
ncnn_dir = ncnn_dir_for(pt_name)
if not os.path.isdir(ncnn_dir):
print(f"[INFO] NCNNモデルがありません。エクスポートします... ({pt_name} → {ncnn_dir})")
YOLO(pt_name).export(format="ncnn")
print("[INFO] NCNNエクスポート完了")
print(f"[INFO] NCNNモデル読み込み: {ncnn_dir}")
return YOLO(ncnn_dir)
def open_camera():
global cap
cap = cv2.VideoCapture(CAM_INDEX, cv2.CAP_V4L2)
try:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
except Exception:
pass
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
cap.set(cv2.CAP_PROP_FPS, FPS_TARGET)
try:
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
except Exception:
pass
if not cap.isOpened():
raise RuntimeError("[ERROR] カメラをオープンできませんでした。CAM_INDEX を確認してください。")
# ---- Health helpers ----
import shutil
from subprocess import check_output, CalledProcessError
def read_temp_c():
try:
with open("/sys/class/thermal/thermal_zone0/temp") as f:
return int(f.read().strip()) / 1000.0
except Exception:
try:
out = check_output(["vcgencmd","measure_temp"]).decode()
return float(out.split("=")[1].split("'")[0])
except Exception:
return None
def read_cpu_mhz():
try:
with open("/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq") as f:
return int(f.read().strip())/1000.0
except Exception:
return None
def read_loadavg():
try:
return list(os.getloadavg())
except Exception:
return None
def read_meminfo():
total=avail=None
try:
with open("/proc/meminfo") as f:
for line in f:
if line.startswith("MemTotal:"):
total = int(line.split()[1])*1024
elif line.startswith("MemAvailable:"):
avail = int(line.split()[1])*1024
if total is not None and avail is not None:
break
except Exception:
pass
return total or 0, avail or 0
def read_uptime_s():
try:
with open("/proc/uptime") as f:
return float(f.read().split()[0])
except Exception:
return 0.0
def read_disk_usage():
try:
du = shutil.disk_usage("/")
return du.total, du.used, du.free
except Exception:
return 0, 0, 0
_last_cpu_total = None
_last_cpu_idle = None
_cpu_state_lock = threading.Lock()
def read_cpu_percent():
global _last_cpu_total, _last_cpu_idle
try:
with open("/proc/stat") as f:
parts = f.readline().split()
if parts[0] != "cpu":
return None
nums = list(map(int, parts[1:]))
user,nice,system,idle,iowait,irq,softirq,steal,guest,guest_nice = (nums+[0]*10)[:10]
idle_all = idle + iowait
non_idle = user + nice + system + irq + softirq + steal
total = idle_all + non_idle
with _cpu_state_lock:
if _last_cpu_total is None:
_last_cpu_total, _last_cpu_idle = total, idle_all
return None
totald = total - _last_cpu_total
idled = idle_all - _last_cpu_idle
_last_cpu_total, _last_cpu_idle = total, idle_all
if totald <= 0:
return None
return max(0.0, min(100.0, (totald - idled) * 100.0 / totald))
except Exception:
return None
# ===== SF風オーバーレイ(走査線/中央レティクルなし) =====
def sci_fi_overlay(frame, result, alpha_glow=0.55, alpha_hud=0.8):
h, w = frame.shape[:2]
base = frame.copy()
glow = np.zeros_like(frame)
hud = np.zeros_like(frame)
palette = [
( 80, 80, 255), # ネオン赤
(255, 255, 80), # シアン寄り
(255, 160, 80), # ブルー
(160, 255, 80), # ミント
( 80, 160, 255), # オレンジ
(255, 80, 255), # マゼンタ
]
boxes = getattr(result, "boxes", None)
masks = getattr(result, "masks", None)
names = getattr(result, "names", {}) or {}
xyxy = confs = clss = None
if boxes is not None and boxes.xyxy is not None:
xyxy = boxes.xyxy.cpu().numpy().astype(int)
confs = (boxes.conf.cpu().numpy() if boxes.conf is not None else np.ones(len(xyxy)))
clss = (boxes.cls.cpu().numpy().astype(int) if boxes.cls is not None else np.zeros(len(xyxy), dtype=int))
if xyxy is not None:
for i, (x1, y1, x2, y2) in enumerate(xyxy):
x1 = max(0, x1); y1 = max(0, y1)
x2 = min(w-1, x2); y2 = min(h-1, y2)
if x2 <= x1 or y2 <= y1:
continue
cls_id = int(clss[i]) if i < len(clss) else 0
p = palette[cls_id % len(palette)]
cv2.rectangle(glow, (x1, y1), (x2, y2), p, thickness=4, lineType=cv2.LINE_AA)
L = max(12, int(min(x2-x1, y2-y1) * 0.18))
th = 2
cv2.line(hud, (x1, y1), (x1+L, y1), p, th, cv2.LINE_AA)
cv2.line(hud, (x1, y1), (x1, y1+L), p, th, cv2.LINE_AA)
cv2.line(hud, (x2, y1), (x2-L, y1), p, th, cv2.LINE_AA)
cv2.line(hud, (x2, y1), (x2, y1+L), p, th, cv2.LINE_AA)
cv2.line(hud, (x1, y2), (x1+L, y2), p, th, cv2.LINE_AA)
cv2.line(hud, (x1, y2), (x1, y2-L), p, th, cv2.LINE_AA)
cv2.line(hud, (x2, y2), (x2-L, y2), p, th, cv2.LINE_AA)
cv2.line(hud, (x2, y2), (x2, y2-L), p, th, cv2.LINE_AA)
conf = float(confs[i]) if i < len(confs) else 0.0
name = names.get(cls_id, f"id:{cls_id}")
text = f"{name} {conf*100:.0f}%"
(tw, th_text), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_DUPLEX, 0.5, 1)
pad = 6
rx1, ry1 = x1, max(0, y1 - (th_text + pad*2 + 6))
rx2, ry2 = min(w-1, x1 + tw + pad*2), y1 - 2
cv2.rectangle(hud, (rx1, ry1), (rx2, ry2), p, -1, cv2.LINE_AA)
cv2.putText(hud, text, (rx1+pad, ry2-pad-2), cv2.FONT_HERSHEY_DUPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
if masks is not None and getattr(masks, "xy", None) is not None:
for idx, poly in enumerate(masks.xy):
if poly is None or len(poly) == 0:
continue
pts = np.round(poly).astype(np.int32).reshape(-1, 1, 2)
color = palette[idx % len(palette)]
cv2.polylines(glow, [pts], isClosed=True, color=color, thickness=4, lineType=cv2.LINE_AA)
cv2.polylines(hud, [pts], isClosed=True, color=color, thickness=2, lineType=cv2.LINE_AA)
if glow.sum() > 0:
glow_blur = cv2.GaussianBlur(glow, (0,0), sigmaX=7, sigmaY=7)
base = cv2.addWeighted(base, 1.0, glow_blur, alpha_glow, 0)
out = cv2.addWeighted(base, 1.0, hud, alpha_hud, 0)
return out
# ===== ストリーミング =====
def gen_frames():
global latest_fps
frame_interval = 1.0 / max(1, FPS_TARGET)
fps_ma = deque(maxlen=20)
last = time.time()
while True:
ok, frame = cap.read()
if not ok:
print("[WARN] フレーム取得失敗。再試行します。")
time.sleep(0.01)
continue
with model_lock:
m = model
with conf_lock:
conf_val = float(CONF)
results = m.predict(
source=frame,
imgsz=IMG_SIZE,
conf=conf_val,
verbose=False
)
now = time.time()
dt = max(1e-6, now - last)
last = now
fps_ma.append(1.0 / dt)
fps_dyn = sum(fps_ma) / len(fps_ma)
with _latest_fps_lock:
latest_fps = fps_dyn
annotated = sci_fi_overlay(frame, results[0])
# 画面左上にFPS表示(控えめ)
cv2.putText(annotated, f"{fps_dyn:4.1f} FPS", (12, 24),
cv2.FONT_HERSHEY_DUPLEX, 0.55, (240,240,240), 1, cv2.LINE_AA)
ok, buffer = cv2.imencode(".jpg", annotated, [int(cv2.IMWRITE_JPEG_QUALITY), JPEG_QUALITY])
if not ok:
continue
jpg = buffer.tobytes()
yield (b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + jpg + b"\r\n")
slack = frame_interval - (time.time() - now)
if slack > 0:
time.sleep(slack)
# ===== ルーティング =====
@app.route("/")
def index():
with conf_lock:
conf_val = float(CONF)
return render_template_string(HTML, img=IMG_SIZE, fps=FPS_TARGET, jpegq=JPEG_QUALITY, conf=f"{conf_val:.2f}")
@app.route("/video_feed")
def video_feed():
return Response(gen_frames(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/set_conf", methods=["POST"])
def set_conf():
data = request.get_json(silent=True) or {}
try:
val = float(data.get("conf", 0.25))
except Exception:
return jsonify({"ok": False, "error": "invalid value"}), 400
val = max(0.0, min(1.0, val))
with conf_lock:
global CONF
CONF = val
return jsonify({"ok": True, "conf": val})
@app.route("/health")
def health():
with _latest_fps_lock:
fps_val = latest_fps
temp_c = read_temp_c()
cpu_mhz = read_cpu_mhz()
load = read_loadavg()
mem_total, mem_avail = read_meminfo()
uptime = read_uptime_s()
disk_total, disk_used, disk_free = read_disk_usage()
throttled = read_throttled()
cpu_percent = read_cpu_percent()
with conf_lock:
conf_val = float(CONF)
return jsonify({
"fps": fps_val if fps_val is not None else 0.0,
"temp_c": temp_c,
"cpu_mhz": cpu_mhz,
"load": load,
"mem_total": mem_total,
"mem_avail": mem_avail,
"uptime": uptime,
"disk_total": disk_total,
"disk_used": disk_used,
"disk_free": disk_free,
"throttled": throttled,
"cpu_percent": cpu_percent,
"conf": conf_val
})
# ===== 起動処理 =====
def main():
global model, cap
print("[INFO] モデル読み込み中 (NCNN固定, yolo11n-seg)...")
ncnn_dir = ncnn_dir_for(MODEL_PT)
if not os.path.isdir(ncnn_dir):
print(f"[INFO] NCNNモデルがありません。エクスポートします... ({MODEL_PT} → {ncnn_dir})")
YOLO(MODEL_PT).export(format="ncnn")
print("[INFO] NCNNエクスポート完了")
model = YOLO(ncnn_dir)
# 軽いウォームアップ
try:
_ = model.predict(source=np.zeros((480, 640, 3), dtype=np.uint8), imgsz=IMG_SIZE, conf=CONF, verbose=False)
except Exception as e:
print(f"[WARN] ウォームアップ失敗: {e}")
print("[INFO] カメラを起動します...")
open_camera()
print("[INFO] 起動しました。ブラウザでこの端末のIP:5000 にアクセスしてください。例: http://raspberrypi.local:5000")
server = threading.Thread(target=lambda: app.run(host="0.0.0.0", port=5000, threaded=True, use_reloader=False))
server.daemon = True
server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
if cap:
cap.release()
print("\n[INFO] 停止しました。")
# ---- Health subroutines used above ----
def read_throttled():
try:
out = check_output(["vcgencmd","get_throttled"]).decode().strip()
if "=" in out:
return out.split("=")[1]
return out
except (FileNotFoundError, CalledProcessError):
return None
if __name__ == "__main__":
main()
LEAVE A REPLY