Add Python TrainSim with loop track map and physics.
Some checks are pending
CodeView TrainSim CI / test (push) Waiting to run
CodeView TrainSim CI / docker (push) Blocked by required conditions

FastAPI server, replaceable JSON config, tests, Dockerfile.
Pairs with ATO via spec/interface.md contract.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Mona Lisa 2026-06-14 20:37:22 +00:00
parent 45c95836ef
commit 9c5ef3a5cd
18 changed files with 414 additions and 18 deletions

View file

@ -1,4 +1,4 @@
name: CodeView CI name: CodeView TrainSim CI
on: on:
push: push:
@ -7,28 +7,21 @@ on:
branches: [main] branches: [main]
jobs: jobs:
lint: test:
runs-on: docker runs-on: docker
container:
image: python:3.12-slim
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Lint - name: Pytest
run: | run: |
if [ -f package.json ]; then npm ci && npm run lint; else echo "no lint configured"; fi pip install -r requirements.txt
pytest -q
tests: docker:
runs-on: docker runs-on: docker
needs: lint needs: test
steps:
- uses: actions/checkout@v4
- name: Unit tests
run: |
if [ -f package.json ]; then npm test; elif [ -f pyproject.toml ]; then pip install -e . && pytest; else echo "no tests configured"; fi
docker-build:
runs-on: docker
needs: tests
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Docker build - name: Docker build
run: | run: docker build -t trainsim:ci .
if [ -f Dockerfile ]; then docker build -t app:ci .; elif [ -f docker-compose.yml ]; then docker compose build; else echo "no docker build configured"; fi

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
__pycache__/
*.pyc
.pytest_cache/
.venv/

10
Dockerfile Normal file
View file

@ -0,0 +1,10 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN pytest -q
EXPOSE 8080
ENV SIM_CONFIG=/app/config/sim.default.json
ENV TRACK_CONFIG=/app/config/track.default.json
CMD ["uvicorn", "trainsim.app:create_app", "--factory", "--host", "0.0.0.0", "--port", "8080"]

View file

@ -1,3 +1,22 @@
# trainsim # trainsim
Python train simulation for ATO integration Python train **simulation** for ATO integration tests. Circle track, simple physics, map UI.
## Layout
| Path | Role |
|------|------|
| `trainsim/` | API + physics |
| `config/` | Replaceable sim + track JSON |
| `spec/` | Interface contract (shared with `ato`) |
| `static/map.html` | Route map (nodes, edges, train dot) |
## Run locally
```bash
pip install -r requirements.txt
pytest -q
uvicorn trainsim.app:create_app --factory --host 0.0.0.0 --port 8080
```
Detailed simulation notes — *TODO*.

8
config/sim.default.json Normal file
View file

@ -0,0 +1,8 @@
{
"track_length_m": 1000.0,
"tick_hz": 20.0,
"traction_gain": 0.15,
"brake_gain": 0.25,
"drag": 0.02,
"max_speed_mps": 25.0
}

14
config/track.default.json Normal file
View file

@ -0,0 +1,14 @@
{
"nodes": [
{"id": "A", "x": 200, "y": 100},
{"id": "B", "x": 400, "y": 200},
{"id": "C", "x": 200, "y": 300},
{"id": "D", "x": 0, "y": 200}
],
"edges": [
{"from": "A", "to": "B", "length_m": 250},
{"from": "B", "to": "C", "length_m": 250},
{"from": "C", "to": "D", "length_m": 250},
{"from": "D", "to": "A", "length_m": 250}
]
}

4
requirements.txt Normal file
View file

@ -0,0 +1,4 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
pytest==8.3.4
httpx==0.28.1

4
spec/actuate.json Normal file
View file

@ -0,0 +1,4 @@
{
"traction_pct": 0,
"brake_pct": 0
}

45
spec/interface.md Normal file
View file

@ -0,0 +1,45 @@
# ATO ↔ TrainSim interface (v1)
Standalone. No ETCS. No ATP. JSON over HTTP.
Base URL (TrainSim): `http://trainsim:8080`
## TrainSim → ATO reads
`GET /train/state`
```json
{
"speed_mps": 0.0,
"position_m": 0.0,
"track_length_m": 1000.0,
"sim_running": false
}
```
## ATO → TrainSim writes
`POST /train/actuate`
```json
{
"traction_pct": 0,
"brake_pct": 0
}
```
Percent 0100. Do not send both > 0 (TrainSim clamps brake wins).
`POST /sim/start` — start physics tick (empty body)
`POST /sim/stop` — pause physics (empty body)
Optional (TrainSim logs only): `POST /train/doors` `{ "command": "open" | "close" }`
## ATO configuration (env)
| Variable | Meaning |
|----------|---------|
| `TRAINSIM_URL` | TrainSim base URL |
| `TARGET_SPEED_MPS` | Cruise target (default 10) |
| `TICK_MS` | Control period (default 100) |

6
spec/train_state.json Normal file
View file

@ -0,0 +1,6 @@
{
"speed_mps": 0.0,
"position_m": 0.0,
"track_length_m": 1000.0,
"sim_running": false
}

66
static/map.html Normal file
View file

@ -0,0 +1,66 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>TrainSim map</title>
<style>
body { font-family: sans-serif; background: #0a0e14; color: #cce; margin: 1rem; }
svg { background: #111820; border: 1px solid #007acc; border-radius: 8px; }
.train { fill: #0098ff; }
.edge { stroke: #445; stroke-width: 4; fill: none; }
.node { fill: #007acc; }
</style>
</head>
<body>
<h1>TrainSim — loop track</h1>
<p id="state">loading…</p>
<svg id="map" width="420" height="420"></svg>
<script>
const svg = document.getElementById('map');
const stateEl = document.getElementById('state');
async function refresh() {
const [layout, state] = await Promise.all([
fetch('/track/layout').then(r => r.json()),
fetch('/train/state').then(r => r.json()),
]);
draw(layout, state);
stateEl.textContent = `speed ${state.speed_mps} m/s · position ${state.position_m} m · running ${state.sim_running}`;
}
function draw(layout, state) {
svg.innerHTML = '';
const nodes = Object.fromEntries(layout.nodes.map(n => [n.id, n]));
layout.edges.forEach(e => {
const a = nodes[e.from], b = nodes[e.to];
const line = document.createElementNS('http://www.w3.org/2000/svg', 'line');
line.setAttribute('x1', a.x); line.setAttribute('y1', a.y);
line.setAttribute('x2', b.x); line.setAttribute('y2', b.y);
line.setAttribute('class', 'edge');
svg.appendChild(line);
});
layout.nodes.forEach(n => {
const c = document.createElementNS('http://www.w3.org/2000/svg', 'circle');
c.setAttribute('cx', n.x); c.setAttribute('cy', n.y); c.setAttribute('r', 8);
c.setAttribute('class', 'node');
svg.appendChild(c);
});
const t = state.position_m / state.track_length_m;
const ids = layout.edges.map(e => e.from);
const idx = Math.floor(t * ids.length) % ids.length;
const from = nodes[ids[idx]];
const to = nodes[layout.edges[idx].to];
const f = t * ids.length - idx;
const x = from.x + (to.x - from.x) * f;
const y = from.y + (to.y - from.y) * f;
const train = document.createElementNS('http://www.w3.org/2000/svg', 'circle');
train.setAttribute('cx', x); train.setAttribute('cy', y); train.setAttribute('r', 12);
train.setAttribute('class', 'train');
svg.appendChild(train);
}
setInterval(refresh, 200);
refresh();
</script>
</body>
</html>

4
tests/conftest.py Normal file
View file

@ -0,0 +1,4 @@
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))

26
tests/test_api.py Normal file
View file

@ -0,0 +1,26 @@
import os
from fastapi.testclient import TestClient
os.environ.setdefault("SIM_CONFIG", "config/sim.default.json")
os.environ.setdefault("TRACK_CONFIG", "config/track.default.json")
from trainsim.app import create_app # noqa: E402
client = TestClient(create_app())
def test_health():
assert client.get("/health").json()["status"] == "ok"
def test_state_shape():
s = client.get("/train/state").json()
assert "speed_mps" in s and "track_length_m" in s
def test_actuate_and_start():
client.post("/sim/start")
client.post("/train/actuate", json={"traction_pct": 50, "brake_pct": 0})
s = client.get("/train/state").json()
assert s["sim_running"] is True

22
tests/test_physics.py Normal file
View file

@ -0,0 +1,22 @@
from trainsim.config_loader import SimParams
from trainsim.physics import Train
def test_train_accelerates_with_traction():
p = SimParams(1000, 20, 0.15, 0.25, 0.02, 25)
t = Train()
t.sim_running = True
t.actuate(100, 0)
for _ in range(120):
t.tick(p, 0.05)
assert t.speed_mps > 0.5
def test_brake_slows_train():
p = SimParams(1000, 20, 0.15, 0.25, 0.02, 25)
t = Train(speed_mps=10.0)
t.sim_running = True
t.actuate(0, 100)
for _ in range(200):
t.tick(p, 0.05)
assert t.speed_mps < 10.0

1
trainsim/__init__.py Normal file
View file

@ -0,0 +1 @@
"""TrainSim — lightweight train physics for ATO testing."""

92
trainsim/app.py Normal file
View file

@ -0,0 +1,92 @@
"""HTTP API — see spec/interface.md."""
from __future__ import annotations
import asyncio
import os
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from trainsim.config_loader import SimParams, TrackLayout, load_sim, load_track
from trainsim.physics import Train
ROOT = Path(__file__).resolve().parent.parent
class ActuateIn(BaseModel):
traction_pct: int = Field(ge=0, le=100)
brake_pct: int = Field(ge=0, le=100)
class DoorIn(BaseModel):
command: str
def create_app() -> FastAPI:
params = load_sim(os.environ.get("SIM_CONFIG"))
track = load_track(os.environ.get("TRACK_CONFIG"))
train = Train()
@asynccontextmanager
async def lifespan(_: FastAPI):
async def tick_loop() -> None:
dt = 1.0 / params.tick_hz
while True:
train.tick(params, dt)
await asyncio.sleep(dt)
task = asyncio.create_task(tick_loop())
yield
task.cancel()
app = FastAPI(title="TrainSim", lifespan=lifespan)
app.mount("/static", StaticFiles(directory=ROOT / "static"), name="static")
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/")
def map_page() -> FileResponse:
return FileResponse(ROOT / "static" / "map.html")
@app.get("/train/state")
def train_state() -> dict:
return {
"speed_mps": round(train.speed_mps, 3),
"position_m": round(train.position_m, 3),
"track_length_m": params.track_length_m,
"sim_running": train.sim_running,
"door_signal": train.door_signal,
}
@app.get("/track/layout")
def track_layout() -> dict:
return {"nodes": track.nodes, "edges": track.edges}
@app.post("/train/actuate")
def actuate(body: ActuateIn) -> dict[str, str]:
train.actuate(body.traction_pct, body.brake_pct)
return {"status": "ok"}
@app.post("/train/doors")
def doors(body: DoorIn) -> dict[str, str]:
train.door_signal = body.command
return {"status": "ok", "door_signal": train.door_signal}
@app.post("/sim/start")
def sim_start() -> dict[str, str]:
train.sim_running = True
return {"status": "ok"}
@app.post("/sim/stop")
def sim_stop() -> dict[str, str]:
train.sim_running = False
return {"status": "ok"}
return app

44
trainsim/config_loader.py Normal file
View file

@ -0,0 +1,44 @@
"""Load JSON config files (replaceable without code changes)."""
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class SimParams:
track_length_m: float
tick_hz: float
traction_gain: float
brake_gain: float
drag: float
max_speed_mps: float
@dataclass(frozen=True)
class TrackLayout:
nodes: list[dict[str, Any]]
edges: list[dict[str, Any]]
def load_sim(path: str | None = None) -> SimParams:
p = Path(path or os.environ.get("SIM_CONFIG", "config/sim.default.json"))
data = json.loads(p.read_text(encoding="utf-8"))
return SimParams(
track_length_m=float(data["track_length_m"]),
tick_hz=float(data["tick_hz"]),
traction_gain=float(data["traction_gain"]),
brake_gain=float(data["brake_gain"]),
drag=float(data["drag"]),
max_speed_mps=float(data["max_speed_mps"]),
)
def load_track(path: str | None = None) -> TrackLayout:
p = Path(path or os.environ.get("TRACK_CONFIG", "config/track.default.json"))
data = json.loads(p.read_text(encoding="utf-8"))
return TrackLayout(nodes=list(data["nodes"]), edges=list(data["edges"]))

34
trainsim/physics.py Normal file
View file

@ -0,0 +1,34 @@
"""Simple 1-D train on a loop."""
from __future__ import annotations
from dataclasses import dataclass
from trainsim.config_loader import SimParams
@dataclass
class Train:
speed_mps: float = 0.0
position_m: float = 0.0
traction_pct: int = 0
brake_pct: int = 0
sim_running: bool = False
door_signal: str = "closed"
def actuate(self, traction_pct: int, brake_pct: int) -> None:
self.traction_pct = max(0, min(100, traction_pct))
self.brake_pct = max(0, min(100, brake_pct))
if self.traction_pct > 0 and self.brake_pct > 0:
self.traction_pct = 0
def tick(self, params: SimParams, dt: float) -> None:
if not self.sim_running:
return
force = (
self.traction_pct / 100.0 * params.traction_gain
- self.brake_pct / 100.0 * params.brake_gain
- params.drag * self.speed_mps
)
self.speed_mps = max(0.0, min(params.max_speed_mps, self.speed_mps + force * dt))
self.position_m = (self.position_m + self.speed_mps * dt) % params.track_length_m