Pipeline Specification -- Astana Grain Terminal
Overview
Astana Grain Terminal operates two grain elevators in Kazakhstan's Kostanay region with a combined storage capacity of 120,000 tonnes. The Operations Director, Assel Nurzhanova, needs a pipeline that combines grain storage data from both elevators with weather data from the Open-Meteo API. The goal: correlate spoilage events with weather conditions to reduce the 800 tonnes of grain lost to spoilage over the past two years.
Data sources
Storage data
Daily CSV exports from two grain elevators. Each file contains storage readings for one month from one elevator.
Columns: bin_id, grain_type, moisture_pct, farmer_id, reading_date, dispatch_date, quality_status
bin_id: Bin identifier. Format varies by elevator.grain_type: wheat, barley, or flax.moisture_pct: Moisture reading as a percentage (float, 8.0-16.0).farmer_id: Farmer account identifier (11-digit string).reading_date: Date of the storage reading (YYYY-MM-DD format, Kazakhstan time UTC+6).dispatch_date: Date grain was dispatched (nullable -- empty if grain is still in storage).quality_status: good, degraded, or spoilage.
Volume: 6 months of data per elevator (January-June 2025). Approximately 1,200 rows per month for Elevator A (48 bins), approximately 900 rows per month for Elevator B (36 bins). Not every bin has a reading every day.
Files provided: materials/storage-data/ directory contains monthly CSV files for both elevators, plus sample files with the first 50 rows of January data.
Weather data
Daily weather observations from the Open-Meteo API for the Kostanay region.
API endpoint: https://api.open-meteo.com/v1/forecast (historical data via the archive endpoint)
Location: Latitude 53.21, Longitude 63.63 (Kostanay, Kazakhstan)
Fields: temperature_2m_max, temperature_2m_min, temperature_2m_mean, relative_humidity_2m_mean, precipitation_sum
Date range: 2025-01-01 to 2025-06-30 (matching the storage data period)
Timezone: The API returns data in UTC. Storage readings are in UTC+6 (Kazakhstan time).
Target outputs
A combined fact table that joins storage readings with weather conditions by date for the Kostanay region.
Grain: One row per bin per day -- each storage reading joined with that day's weather observation. This grain supports:
- Spoilage-event-to-weather correlation (which weather conditions preceded spoilage?)
- Storage utilization by bin, grain type, and farmer
- Historical trend analysis across both elevators
Schema requirements
Staging layer
stg_storage: Storage readings with standardized bin IDs (both elevator formats mapped to a consistent scheme including elevator identification), cleaned types, consistent column naming.stg_weather: Weather data with timestamps converted from UTC to UTC+6 (Kazakhstan time), cleaned types, consistent column naming.
Mart layer
A fact table joining storage readings with weather conditions on date. The join key is the reading date (from storage) matched to the observation date (from weather, after timezone conversion). All storage readings that fall within the weather data date range should have weather conditions available.
Verification targets
See verification-checklist.md for exact values. Key checks:
- Storage source row count: Total rows across all elevator CSV files.
- Weather extraction count: Must match the number of days in the date range (one observation per day).
- Staging counts:
stg_storagerow count must match raw storage count exactly.stg_weatherrow count must match raw weather count exactly. - Mart count: Number of storage readings with matching weather data.
- Spoilage correlation: Spot-check specific spoilage events against weather conditions for those dates. Spoilage events should show temperature below -15C and humidity above 80%.
Operational requirements
- Idempotency: The extraction and pipeline must be safe to re-run. Running the extraction twice must produce the same row count (no duplicates). Running the full pipeline twice must produce the same mart output.
- Logging: The extraction script must log: records fetched per request, any HTTP errors encountered, total extraction time, and the final count comparison against the API's reported total.
- Execution: The pipeline runs as Python scripts and SQL queries against DuckDB. No Dagster orchestration for this project.