Find simultanious devices through Tautulli

This commit is contained in:
Logan Saso
2026-03-29 19:38:37 -07:00
commit c43477c1c4
4 changed files with 674 additions and 0 deletions

592
simul_finder.py Normal file
View File

@@ -0,0 +1,592 @@
#!/usr/bin/env python3
"""
Plex account sharing detector using Tautulli API.
Finds users streaming from multiple devices/locations, with per-device
usage breakdowns and "teleportation" detection (impossible travel between
geographically distant sessions).
"""
import argparse
import math
import os
import sys
from datetime import datetime, timedelta
import requests
# ---------------------------------------------------------------------------
# Tautulli API helpers
# ---------------------------------------------------------------------------
def tautulli_api(base_url: str, api_key: str, cmd: str, **params) -> dict:
params = {k: v for k, v in params.items() if v is not None}
resp = requests.get(
f"{base_url}/api/v2",
params={"apikey": api_key, "cmd": cmd, **params},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if data.get("response", {}).get("result") != "success":
msg = data.get("response", {}).get("message", "unknown error")
raise RuntimeError(f"Tautulli API error ({cmd}): {msg}")
return data["response"]["data"]
def get_users(base_url: str, api_key: str) -> list[dict]:
data = tautulli_api(base_url, api_key, "get_users_table", length=500)
return data.get("data", [])
def get_history(base_url: str, api_key: str, user_id: int, after: str, length: int = 1000) -> list[dict]:
data = tautulli_api(
base_url, api_key, "get_history",
user_id=user_id, after=after, length=length,
)
return data.get("data", [])
def get_geoip(base_url: str, api_key: str, ip: str) -> dict:
return tautulli_api(base_url, api_key, "get_geoip_lookup", ip_address=ip)
# ---------------------------------------------------------------------------
# Geo / teleportation helpers
# ---------------------------------------------------------------------------
def is_private_ip(ip: str) -> bool:
return ip.startswith(("10.", "192.168.", "172.16.", "127.", "0."))
def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance between two points in km."""
R = 6371.0
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2))
* math.sin(dlon / 2) ** 2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# ~900 km/h is the fastest commercial flight
MAX_TRAVEL_SPEED_KMH = 900
def find_teleportations(
history: list[dict],
ip_geo_cache: dict[str, dict],
) -> list[dict]:
"""Find session transitions where the user moved faster than physically possible."""
# Build a timeline: (timestamp, ip, machine_id, title) sorted by time
events = []
for rec in history:
ip = rec.get("ip_address", "")
if not ip or is_private_ip(ip):
continue
geo = ip_geo_cache.get(ip)
if not geo or not geo.get("latitude") or not geo.get("longitude"):
continue
started = rec.get("started")
stopped = rec.get("stopped")
if started:
events.append({
"time": started,
"ip": ip,
"lat": float(geo["latitude"]),
"lon": float(geo["longitude"]),
"location": geo.get("city", "?"),
"region": geo.get("region", ""),
"machine_id": rec.get("machine_id", "?"),
"platform": rec.get("platform", "?"),
"title": rec.get("full_title") or rec.get("title", "?"),
"event": "start",
})
if stopped:
events.append({
"time": stopped,
"ip": ip,
"lat": float(geo["latitude"]),
"lon": float(geo["longitude"]),
"location": geo.get("city", "?"),
"region": geo.get("region", ""),
"machine_id": rec.get("machine_id", "?"),
"platform": rec.get("platform", "?"),
"title": rec.get("full_title") or rec.get("title", "?"),
"event": "stop",
})
events.sort(key=lambda e: e["time"])
teleportations = []
for i in range(len(events) - 1):
a, b = events[i], events[i + 1]
if a["ip"] == b["ip"]:
continue
dist_km = haversine_km(a["lat"], a["lon"], b["lat"], b["lon"])
if dist_km < 50: # same metro area, ignore
continue
time_diff_h = (b["time"] - a["time"]) / 3600
if time_diff_h <= 0:
time_diff_h = 0.001 # concurrent
speed_kmh = dist_km / time_diff_h
if speed_kmh > MAX_TRAVEL_SPEED_KMH:
teleportations.append({
"from": a,
"to": b,
"dist_km": dist_km,
"time_diff_h": time_diff_h,
"speed_kmh": speed_kmh,
})
return teleportations
# ---------------------------------------------------------------------------
# Analysis
# ---------------------------------------------------------------------------
def same_network(ip_a: str, ip_b: str) -> bool:
"""True if both IPs are on the same network (both private = same household)."""
if is_private_ip(ip_a) and is_private_ip(ip_b):
return True
return ip_a == ip_b
def effective_end(session: dict) -> int:
"""Use started + play_duration as the end time, since 'stopped' can be hours
after playback ended (e.g. fell asleep, left app open)."""
started = session["started"]
duration = session.get("play_duration") or session.get("duration") or 0
if duration > 0:
return started + duration
return session.get("stopped") or started
def find_concurrent_sessions(history: list[dict]) -> list[tuple[dict, dict]]:
"""Find pairs of history records that overlap in time from different devices and networks."""
overlaps = []
for i, a in enumerate(history):
if not a.get("started"):
continue
a_end = effective_end(a)
for b in history[i + 1:]:
if not b.get("started"):
continue
b_end = effective_end(b)
if a.get("machine_id") == b.get("machine_id"):
continue
if same_network(a.get("ip_address", ""), b.get("ip_address", "")):
continue
if a["started"] < b_end and b["started"] < a_end:
overlaps.append((a, b))
return overlaps
def resolve_ips(base_url: str, api_key: str, ips: set[str]) -> dict[str, dict]:
"""Geo-resolve a set of IPs, returning {ip: geo_dict}."""
cache: dict[str, dict] = {}
for ip in ips:
if is_private_ip(ip):
cache[ip] = {"city": "LAN", "region": "", "country": ""}
continue
try:
cache[ip] = get_geoip(base_url, api_key, ip)
except Exception:
cache[ip] = {}
return cache
def fmt_location(geo: dict) -> str:
if not geo:
return "Unknown"
parts = [geo.get("city"), geo.get("region"), geo.get("country")]
return ", ".join(p for p in parts if p) or "Unknown"
def analyze_user(base_url: str, api_key: str, user: dict, after: str, geo: bool) -> dict | None:
uid = user["user_id"]
name = user.get("friendly_name") or user.get("username") or str(uid)
history = get_history(base_url, api_key, uid, after)
if not history:
return None
# --- Device breakdown from history ---
devices: dict[str, dict] = {}
all_ips: set[str] = set()
for rec in history:
mid = rec.get("machine_id", "unknown")
if mid not in devices:
devices[mid] = {
"machine_id": mid,
"platform": rec.get("platform", "?"),
"player": rec.get("player", "?"),
"ips": set(),
"plays": 0,
"duration_sec": 0,
}
devices[mid]["plays"] += 1
devices[mid]["duration_sec"] += rec.get("play_duration") or rec.get("duration") or 0
ip = rec.get("ip_address")
if ip:
devices[mid]["ips"].add(ip)
all_ips.add(ip)
if len(devices) < 2:
return None
# --- Concurrent sessions ---
overlaps = find_concurrent_sessions(history)
overlap_pairs: set[tuple[str, str]] = set()
for a, b in overlaps:
pair = tuple(sorted([a.get("machine_id", "?"), b.get("machine_id", "?")]))
overlap_pairs.add(pair)
# --- Geo + teleportation (when --geo) ---
ip_geo_cache: dict[str, dict] = {}
ip_locations: dict[str, str] = {}
teleportations: list[dict] = []
if geo:
ip_geo_cache = resolve_ips(base_url, api_key, all_ips)
ip_locations = {ip: fmt_location(g) for ip, g in ip_geo_cache.items()}
teleportations = find_teleportations(history, ip_geo_cache)
# --- Scoring ---
n_devices = len(devices)
n_overlaps = len(overlaps)
n_overlap_pairs = len(overlap_pairs)
heavy_devices = sum(1 for d in devices.values() if d["plays"] > 5)
score = (
(n_devices - 1) * 10
+ n_overlap_pairs * 25
+ min(n_overlaps, 50) * 2
+ max(0, heavy_devices - 1) * 15
+ len(teleportations) * 30 # teleportation is a strong signal
)
return {
"user_id": uid,
"name": name,
"history": history,
"devices": devices,
"n_devices": n_devices,
"heavy_devices": heavy_devices,
"total_plays": sum(d["plays"] for d in devices.values()),
"total_duration_sec": sum(d["duration_sec"] for d in devices.values()),
"n_overlaps": n_overlaps,
"n_overlap_pairs": n_overlap_pairs,
"teleportations": teleportations,
"score": score,
"ip_locations": ip_locations,
}
# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------
def fmt_duration(seconds: int) -> str:
h, m = divmod(seconds // 60, 60)
if h > 0:
return f"{h}h {m}m"
return f"{m}m"
def fmt_timestamp(ts: int | float) -> str:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
def filter_flagged(results: list[dict], min_score: int, concurrent_only: bool) -> list[dict]:
flagged = [r for r in results if r["score"] >= min_score]
if concurrent_only:
flagged = [r for r in flagged if r["n_overlaps"] > 0]
flagged.sort(key=lambda r: r["score"], reverse=True)
return flagged
def print_device_report(flagged: list[dict]):
"""Device-centric view: one row per device with aggregated stats."""
print(f"\n{'='*78}")
print(f" DEVICE BREAKDOWN — {len(flagged)} user(s) flagged")
print(f"{'='*78}\n")
for r in flagged:
print(f" {r['name']} (user_id: {r['user_id']})")
print(f" Score: {r['score']} | Devices: {r['n_devices']} | "
f"Heavy devices: {r['heavy_devices']} | "
f"Concurrent sessions: {r['n_overlaps']} | "
f"Concurrent device pairs: {r['n_overlap_pairs']}")
print(f" Total plays: {r['total_plays']} | "
f"Total watch time: {fmt_duration(r['total_duration_sec'])}")
print()
sorted_devices = sorted(r["devices"].values(), key=lambda d: d["duration_sec"], reverse=True)
print(f" {'Platform':<16} {'Player':<24} {'Plays':>6} {'Watch Time':>11} IPs")
print(f" {'-'*16} {'-'*24} {'-'*6} {'-'*11} {'-'*30}")
for d in sorted_devices:
ips_display = []
for ip in sorted(d["ips"]):
loc = r["ip_locations"].get(ip)
if loc:
ips_display.append(f"{ip} ({loc})")
else:
ips_display.append(ip)
ip_str = ", ".join(ips_display) if ips_display else ""
print(f" {d['platform']:<16} {d['player']:<24} {d['plays']:>6} "
f"{fmt_duration(d['duration_sec']):>11} {ip_str}")
if r["teleportations"]:
print(f"\n TELEPORTATION DETECTED ({len(r['teleportations'])} event(s)):")
for t in r["teleportations"][:10]:
f_ = t["from"]
to = t["to"]
print(f" {fmt_timestamp(f_['time'])} {f_['location']}, {f_['region']}"
f" --> {fmt_timestamp(to['time'])} {to['location']}, {to['region']}")
print(f" {t['dist_km']:.0f} km in {t['time_diff_h']:.1f}h"
f" ({t['speed_kmh']:.0f} km/h — max plausible: {MAX_TRAVEL_SPEED_KMH} km/h)")
print(f" Devices: {f_['platform']} -> {to['platform']}")
print(f"\n{'-'*78}\n")
def print_timeline_report(flagged: list[dict]):
"""Chronological view: sessions listed by date with device/IP info and concurrency markers."""
print(f"\n{'='*78}")
print(f" SESSION TIMELINE — {len(flagged)} user(s) flagged")
print(f"{'='*78}\n")
for r in flagged:
history = r.get("history", [])
if not history:
continue
print(f" {r['name']} (user_id: {r['user_id']}) — score: {r['score']}")
print()
# Sort sessions chronologically
sessions = sorted(
[s for s in history if s.get("started")],
key=lambda s: s["started"],
)
# Pre-compute which sessions are concurrent with another from a different device + network
concurrent_sessions: set[int] = set()
for i, a in enumerate(sessions):
a_end = effective_end(a)
for j, b in enumerate(sessions[i + 1:], start=i + 1):
b_end = effective_end(b)
if a.get("machine_id") == b.get("machine_id"):
continue
if same_network(a.get("ip_address", ""), b.get("ip_address", "")):
continue
if a["started"] < b_end and b["started"] < a_end:
concurrent_sessions.add(i)
concurrent_sessions.add(j)
if not concurrent_sessions:
print(f" (no concurrent sessions)")
print(f"\n{'-'*78}\n")
continue
current_date = None
for idx, s in enumerate(sessions):
if idx not in concurrent_sessions:
continue
ts = datetime.fromtimestamp(s["started"])
date_str = ts.strftime("%Y-%m-%d")
time_str = ts.strftime("%H:%M")
dur = s.get("play_duration") or s.get("duration") or 0
end_ts = effective_end(s)
end_str = datetime.fromtimestamp(end_ts).strftime("%H:%M")
if date_str != current_date:
current_date = date_str
print(f" {date_str}")
ip = s.get("ip_address", "?")
loc = r["ip_locations"].get(ip)
ip_display = f"{ip} ({loc})" if loc else ip
platform = s.get("platform", "?")
player = s.get("player", "?")
title = s.get("full_title") or s.get("title") or "?"
if len(title) > 40:
title = title[:37] + "..."
print(f" {time_str}-{end_str} {fmt_duration(dur):>7}"
f" {platform:<14} {player:<20} {ip_display}")
print(f" {title}")
print(f"\n{'-'*78}\n")
def print_ip_report(user_ip_stats: list[dict], ip_locations: dict[str, str]):
"""Per-user breakdown of usage by IP, sorted by watch time."""
print(f"\n{'='*78}")
print(f" IP USAGE BY USER — {len(user_ip_stats)} user(s)")
print(f"{'='*78}\n")
for u in user_ip_stats:
print(f" {u['name']} (user_id: {u['user_id']})")
total_dur = sum(ip["duration_sec"] for ip in u["ips"].values())
print(f" Total: {u['total_plays']} plays, {fmt_duration(total_dur)} watch time, "
f"{len(u['ips'])} unique IP(s)")
print()
sorted_ips = sorted(u["ips"].values(), key=lambda x: x["duration_sec"], reverse=True)
print(f" {'IP':<22} {'Plays':>6} {'Watch Time':>11} {'Pct':>5} Devices")
print(f" {'-'*22} {'-'*6} {'-'*11} {'-'*5} {'-'*30}")
for ip_info in sorted_ips:
ip = ip_info["ip"]
loc = ip_locations.get(ip)
ip_display = f"{ip} ({loc})" if loc else ip
pct = (ip_info["duration_sec"] / total_dur * 100) if total_dur else 0
devices = ", ".join(sorted(ip_info["platforms"]))
print(f" {ip_display:<22} {ip_info['plays']:>6} "
f"{fmt_duration(ip_info['duration_sec']):>11} {pct:>4.0f}% {devices}")
print(f"\n{'-'*78}\n")
def print_report(results: list[dict], min_score: int, concurrent_only: bool, timeline: bool):
flagged = filter_flagged(results, min_score, concurrent_only)
if not flagged:
print("No users flagged for potential account sharing.")
return
print_device_report(flagged)
if timeline:
print_timeline_report(flagged)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def build_base_url(raw: str) -> str:
"""Accept a hostname, host:port, or full URL and return a base URL."""
if raw.startswith(("http://", "https://")):
return raw.rstrip("/")
return f"http://{raw.rstrip('/')}"
def main():
parser = argparse.ArgumentParser(
description="Detect Plex account sharing via Tautulli",
)
parser.add_argument("--host",
default=os.environ.get("TAUTULLI_HOST") or os.environ.get("TAUTULLI_URL"),
help="Tautulli host or URL (or set TAUTULLI_HOST env var)")
parser.add_argument("--api-key",
default=os.environ.get("TAUTULLI_API_KEY"),
help="Tautulli API key (or set TAUTULLI_API_KEY env var)")
parser.add_argument("--days", type=int, default=30,
help="Look back N days (default: 30)")
parser.add_argument("--min-score", type=int, default=20,
help="Minimum suspicion score to flag (default: 20)")
parser.add_argument("--geo", action="store_true",
help="Enable IP geolocation + teleportation detection (more API calls)")
parser.add_argument("--concurrent-only", action="store_true",
help="Only show users with concurrent sessions from different devices")
parser.add_argument("--timeline", action="store_true",
help="Show a chronological session timeline in addition to the device breakdown")
parser.add_argument("--top-ips", action="store_true",
help="Show per-user IP usage breakdown ranked by watch time")
parser.add_argument("--user", type=str, default=None,
help="Analyze a single user by friendly name")
args = parser.parse_args()
if not args.host or not args.api_key:
print("Error: provide --host and --api-key (or set TAUTULLI_HOST / TAUTULLI_API_KEY).",
file=sys.stderr)
sys.exit(1)
base_url = build_base_url(args.host)
after = (datetime.now() - timedelta(days=args.days)).strftime("%Y-%m-%d")
print(f"Fetching users from Tautulli ({base_url})...")
users = get_users(base_url, args.api_key)
if args.user:
users = [u for u in users if (u.get("friendly_name") or "").lower() == args.user.lower()
or (u.get("username") or "").lower() == args.user.lower()]
if not users:
print(f"User '{args.user}' not found.", file=sys.stderr)
sys.exit(1)
print(f"Analyzing {len(users)} user(s) over the last {args.days} days...")
if args.geo:
print(" (geo lookups enabled — teleportation detection active)")
print()
if args.top_ips:
all_ips_seen: set[str] = set()
user_ip_stats = []
for i, user in enumerate(users):
uid = user["user_id"]
name = user.get("friendly_name") or user.get("username") or "?"
print(f" [{i+1}/{len(users)}] {name}...", end="", flush=True)
try:
history = get_history(base_url, args.api_key, uid, after)
except Exception as e:
print(f" error: {e}")
continue
if not history:
print(" no history")
continue
ips: dict[str, dict] = {}
for rec in history:
ip = rec.get("ip_address", "")
if not ip:
continue
if ip not in ips:
ips[ip] = {"ip": ip, "plays": 0, "duration_sec": 0, "platforms": set()}
ips[ip]["plays"] += 1
ips[ip]["duration_sec"] += rec.get("play_duration") or rec.get("duration") or 0
platform = rec.get("platform", "?")
ips[ip]["platforms"].add(platform)
all_ips_seen.add(ip)
total_plays = sum(d["plays"] for d in ips.values())
user_ip_stats.append({"user_id": uid, "name": name, "ips": ips, "total_plays": total_plays})
print(f" {total_plays} plays, {len(ips)} IP(s)")
ip_locations: dict[str, str] = {}
if args.geo:
print(f"\n Resolving {len(all_ips_seen)} IP(s)...")
geo_cache = resolve_ips(base_url, args.api_key, all_ips_seen)
ip_locations = {ip: fmt_location(g) for ip, g in geo_cache.items()}
# Sort users by total watch time descending
user_ip_stats.sort(
key=lambda u: sum(ip["duration_sec"] for ip in u["ips"].values()), reverse=True
)
print_ip_report(user_ip_stats, ip_locations)
return
results = []
for i, user in enumerate(users):
name = user.get("friendly_name") or user.get("username") or "?"
print(f" [{i+1}/{len(users)}] {name}...", end="", flush=True)
try:
result = analyze_user(base_url, args.api_key, user, after, args.geo)
if result:
results.append(result)
extra = ""
if result["teleportations"]:
extra = f", {len(result['teleportations'])} teleportation(s)!"
print(f" {result['n_devices']} devices, score {result['score']}{extra}")
else:
print(" skip (0-1 devices)")
except Exception as e:
print(f" error: {e}")
print_report(results, args.min_score, args.concurrent_only, args.timeline)
if __name__ == "__main__":
main()