This commit is contained in:
2026-05-13 09:37:42 -04:00
parent ca3469376a
commit 2889993a7d
40 changed files with 1583 additions and 0 deletions

70
twitch-highlight/.gitignore vendored Normal file
View File

@@ -0,0 +1,70 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db

View File

@@ -0,0 +1,72 @@
# Twitch Highlight
A Twitch highlight extraction tool for extracting clips and segments from Twitch streams.
## Configuration
This application requires Twitch API credentials to function. You need to set up a `.env` file in the root of the project with the following environment variables:
```
TWITCH_NICKNAME=your_twitch_username
TWITCH_OAUTH_TOKEN=oauth:your_oauth_token
TWITCH_CLIENT_ID=your_client_id
```
### Getting Twitch Credentials
You have a few options to get your OAuth token:
1. Go to [https://twitchtokengenerator.com/](https://twitchtokengenerator.com/)
2. Click "Connect with Twitch" to authorize
3. Select the scopes you need (e.g., "Chat: Read Chat", "Chat: Send Chat Messages")
4. Copy the generated token (it will start with `oauth:`)
#### 4. Your Twitch Nickname
This is simply your Twitch username (the one you use to log in). For example, if your Twitch channel is `https://twitch.tv/yourname`, your nickname is `yourname`.
### Setting Up the .env File
Create a `.env` file in the project root directory:
```env
TWITCH_NICKNAME=relicjamin1
TWITCH_OAUTH_TOKEN=oauth:your_actual_token_here
TWITCH_CLIENT_ID=your_actual_client_id_here
```
**Important**: Never commit your `.env` file to version control. The `.env` file is already included in `.gitignore`.
## Installation
```bash
pip install -e .
```
For development dependencies:
```bash
pip install -e ".[dev]"
```
## Usage
```bash
python ./src/main.py
```
## Development
```bash
# Run tests
pytest
# Format code
black .
# Lint
ruff check .
# Type check
mypy src/
```

View File

@@ -0,0 +1,69 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "twitch-highlight"
version = "0.1.0"
description = "Twitch highlight extraction tool"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}
authors = [
{name = "Collin Campbell", email = "collin@example.com"}
]
keywords = ["twitch", "highlight", "video"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = ["python-dotenv>=1.0.0", "streamlink>=6.0.0", "textual>=7.5.0"]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-cov>=4.0",
"black>=23.0",
"ruff>=0.1.0",
"mypy>=1.0",
"textual-dev>=1.8.0"
]
[project.urls]
Homepage = "https://github.com/collincampbell/twitch-highlight"
Repository = "https://github.com/collincampbell/twitch-highlight"
Issues = "https://github.com/collincampbell/twitch-highlight/issues"
[tool.setuptools.packages.find]
where = ["src"]
[tool.black]
line-length = 100
target-version = ['py38', 'py39', 'py310', 'py311', 'py312']
[tool.ruff]
line-length = 100
select = ["E", "F", "I", "N", "W"]
ignore = []
[tool.ruff.per-file-ignores]
"__init__.py" = ["F401"]
[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = false
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]

View File

@@ -0,0 +1,3 @@
"""Twitch Highlight Package."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,13 @@
"""Main module for twitch-highlight."""
from services.config import load_config
from tui.tui import TwitchHighlightTUI
def main():
"""Main entry point."""
TwitchHighlightTUI(load_config()).run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,33 @@
import os
from pathlib import Path
from dotenv import load_dotenv
env_path = Path(__file__).parent.parent.parent / ".env"
load_dotenv(env_path)
TWITCH_NICKNAME = os.getenv("TWITCH_NICKNAME", "")
TWITCH_OAUTH_TOKEN = os.getenv("TWITCH_OAUTH_TOKEN", "")
TWITCH_CLIENT_ID = os.getenv("TWITCH_CLIENT_ID", "")
class Config:
def __init__(self, nickname: str, oauth_token: str, client_id: str):
self.nickname = nickname
self.oauth_token = oauth_token
self.client_id = client_id
def load_config() -> Config:
missing = []
if not TWITCH_NICKNAME:
missing.append("TWITCH_NICKNAME")
if not TWITCH_OAUTH_TOKEN:
missing.append("TWITCH_OAUTH_TOKEN")
if not TWITCH_CLIENT_ID:
missing.append("TWITCH_CLIENT_ID")
if missing:
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
return Config(TWITCH_NICKNAME, TWITCH_OAUTH_TOKEN, TWITCH_CLIENT_ID)

View File

@@ -0,0 +1,115 @@
import socket
import time
from dataclasses import dataclass
from collections import deque
from typing import Optional
@dataclass
class ChatRateStats:
current_rate: float
average_rate: float
total_messages: int
rate_of_change: float
class TwitchClient:
def __init__(self, oauth_token: str, nickname: str, window_seconds: int = 60):
self.oauth_token = oauth_token
self.nickname = nickname
self.window_seconds = window_seconds
self.socket: Optional[socket.socket] = None
self.message_timestamps: deque[float] = deque()
self.start_time: Optional[float] = None
self.total_messages = 0
self.connected = False
self.channel = ""
def set_channel(self, channel: str):
self.channel = channel
def connect(self) -> None:
self.socket = socket.socket()
self.socket.connect(("irc.chat.twitch.tv", 6667))
self.socket.send(f"PASS {self.oauth_token}\n".encode("utf-8"))
self.socket.send(f"NICK {self.nickname}\n".encode("utf-8"))
self.socket.send(f"JOIN #{self.channel}\n".encode("utf-8"))
self.connected = True
self.start_time = time.time()
def disconnect(self) -> None:
if self.socket:
self.socket.send("PART #{}\n".format(self.channel).encode("utf-8"))
self.socket.close()
self.connected = False
def _receive_messages(self) -> list[str]:
messages = []
if self.socket:
try:
data = self.socket.recv(2048).decode("utf-8")
print(data)
if data.find("PING") != -1:
print("sending back pong")
self.socket.send("PONG :tmi.twitch.tv\n".encode("utf-8"))
messages = [line for line in data.split("\r\n") if line.strip()]
except Exception:
pass
return messages
def _is_chat_message(self, line: str) -> bool:
return "PRIVMSG #{} :".format(self.channel) in line
def _cleanup_old_timestamps(self) -> None:
current_time = time.time()
while (
self.message_timestamps
and current_time - self.message_timestamps[0] > self.window_seconds
):
self.message_timestamps.popleft()
def read_messages(self) -> list[str]:
lines = self._receive_messages()
chat_messages = []
current_time = time.time()
for line in lines:
if self._is_chat_message(line):
chat_messages.append(line)
self.message_timestamps.append(current_time)
self.total_messages += 1
self._cleanup_old_timestamps()
return chat_messages
def _calculate_current_rate(self) -> float:
if not self.message_timestamps:
return 0.0
return len(self.message_timestamps) / self.window_seconds
def _calculate_average_rate(self) -> float:
if not self.start_time:
return 0.0
elapsed_time = time.time() - self.start_time
if elapsed_time == 0:
return 0.0
return self.total_messages / elapsed_time
def get_rate_stats(self, old: ChatRateStats) -> ChatRateStats:
self._cleanup_old_timestamps()
current_rate = self._calculate_current_rate()
average_rate = self._calculate_average_rate()
roc = 0.0
if old.current_rate != 0.0:
roc = (current_rate - old.current_rate) / old.current_rate
return ChatRateStats(
current_rate=current_rate,
average_rate=average_rate,
total_messages=self.total_messages,
rate_of_change=roc,
)

View File

@@ -0,0 +1,119 @@
import io
import json
import subprocess
import urllib.request
from pathlib import Path
from typing import Callable, Optional
class TwitchVideoClient:
def __init__(self, oauth_token: str, client_id: str, buffer_seconds: int = 15):
self.oauth_token = oauth_token
self.client_id = client_id
self.base_url = "https://api.twitch.tv/helix"
self.buffer_seconds = buffer_seconds
self._buffer: io.BytesIO = io.BytesIO()
self._is_recording = False
self._process: Optional[subprocess.Popen] = None
self.channel = ""
def set_channel(self, channel: str):
self.channel = channel
def _make_request(self, endpoint: str, params: Optional[dict] = None) -> dict:
url = f"{self.base_url}{endpoint}"
if params:
url += f"?{'&'.join(f'{k}={v}' for k, v in params.items())}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {self.oauth_token.removeprefix('oauth:')}")
req.add_header("Client-Id", self.client_id)
with urllib.request.urlopen(req) as response:
return json.loads(response.read().decode("utf-8"))
def _get_stream_url(self) -> str:
try:
import streamlink
streams = streamlink.streams(f"https://twitch.tv/{self.channel}")
if "best" not in streams:
raise ValueError(f"No stream available for channel {self.channel}")
return streams["best"].url
except ImportError:
raise ImportError("streamlink is required. Install with: pip install streamlink")
except Exception as e:
raise RuntimeError(f"Failed to get stream URL: {e}")
def start_recording(self, on_chunk: Optional[Callable[[bytes], None]] = None) -> None:
if self._is_recording:
raise RuntimeError("Already recording")
try:
stream_url = self._get_stream_url()
ffmpeg_cmd = [
"ffmpeg",
"-i",
stream_url,
"-t",
str(self.buffer_seconds),
"-c",
"copy",
"-f",
"mpegts",
"-",
]
self._process = subprocess.Popen(
ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
self._is_recording = True
self._buffer = io.BytesIO()
def read_stream():
try:
while True:
chunk = self._process.stdout.read(8192)
if not chunk:
break
self._buffer.write(chunk)
if on_chunk:
on_chunk(chunk)
except Exception:
pass
import threading
self._read_thread = threading.Thread(target=read_stream)
self._read_thread.daemon = True
self._read_thread.start()
except Exception as e:
raise RuntimeError(f"Failed to start recording: {e}")
def stop_recording(self) -> bytes:
if not self._is_recording:
raise RuntimeError("Not recording")
if self._process:
self._process.terminate()
self._process.wait()
self._is_recording = False
self._buffer.seek(0)
return self._buffer.getvalue()
def save_to_file(self, filepath: Path, data: Optional[bytes] = None) -> Path:
if data is None:
data = self._buffer.getvalue()
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "wb") as f:
f.write(data)
return filepath
def clear_buffer(self) -> None:
self._buffer = io.BytesIO()

View File

@@ -0,0 +1,114 @@
from services.config import Config
from services.twitch_client import TwitchClient
from services.twitch_video import TwitchVideoClient
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widgets import Input, Log, Button
import time
import datetime
from pathlib import Path
from services.twitch_client import ChatRateStats
import asyncio
class TwitchHighlightTUI(App):
CSS = """
.header {
height: 3;
dock: top;
}
.main-content {
height: 1fr;
}
#channel-input {
width: 1fr;
}
#connect-btn {
margin-left: 1;
}
#log {
height: 1fr;
border-top: solid $primary;
}
"""
def __init__(self, config: Config) -> None:
super().__init__()
self.twitch_chat = TwitchClient(config.oauth_token, config.nickname)
self.twitch_record = TwitchVideoClient(config.oauth_token, config.client_id)
def compose(self) -> ComposeResult:
with Vertical(classes="main-content"):
with Horizontal(classes="header"):
yield Input(placeholder="Enter channel name...", id="channel-input")
yield Button("Connect", id="connect-btn")
yield Button("Disconnect", id="disconnect-btn")
yield Log(id="log")
async def update(self):
start_time = time.time()
current_rate = ChatRateStats(
current_rate=0.0, average_rate=0.0, total_messages=0, rate_of_change=0.0
)
while self.twitch_chat.connected:
messages = self.twitch_chat.read_messages()
current_rate = self.twitch_chat.get_rate_stats(current_rate)
self.add_log(f"{current_rate}, highlights enabled: {time.time() - start_time > 120}")
for m in messages:
self.add_log(m)
if current_rate.rate_of_change > 0.8 and time.time() - start_time > 120:
self.add_log("POG CHAMP HIGHLIGH TIME")
self.twitch_record.stop_recording()
self.twitch_record.save_to_file(
Path(f"clips/{self.twitch_record.channel}-{datetime.datetime.now()}.mp4")
)
self.twitch_record.clear_buffer()
self.twitch_record.start_recording()
await asyncio.sleep(15)
self.add_log("disconnected worker")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "connect-btn":
channel_input = self.query_one("#channel-input", Input)
channel = channel_input.value.strip()
if channel:
self.log(f"Connecting to channel: {channel}")
self.twitch_chat.set_channel(channel)
self.twitch_record.set_channel(channel)
self.start_twitch()
else:
self.log("Please enter a channel name", "warning")
elif event.button.id == "disconnect-btn":
self.log("Disconnection from channel")
self.twitch_chat.set_channel("")
self.twitch_chat.set_channel("")
self.dissconnect_twitch()
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "channel-input":
channel = event.input.value.strip()
if channel:
self.log(f"Connecting to channel: {channel}")
event.input.value = ""
else:
self.log("Please enter a channel name", "warning")
def start_twitch(self):
self.add_log("Connecting...")
self.twitch_chat.connect()
self.twitch_record.start_recording()
self.run_worker(self.update)
def dissconnect_twitch(self):
self.add_log("diconnecting, wait until worker reports its done")
self.twitch_chat.disconnect()
self.twitch_record.stop_recording()
def add_log(self, message: str, level: str = "info") -> None:
log_widget = self.query_one("#log", Log)
log_widget.write_line(f"[{level.upper()}] {message}")

View File

@@ -0,0 +1,6 @@
"""Tests package."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))

View File

@@ -0,0 +1,8 @@
"""Example test module."""
import pytest
def test_example():
"""Example test function."""
assert True