Step 1: Create the dbt project structure
Set up the dbt project with DuckDB as the adapter. Define sources for both mills -- Mill 1's CSV export and Mill 2's JSON export. Plan the staging models: stg_mill1_daily and stg_mill2_daily as source-conform models, then int_daily_operations as the unifying intermediate model.
With project memory active, AI should follow the naming conventions and field mappings from CLAUDE.md. Watch for this -- if the generated project structure uses names that do not match your data dictionary, the project memory is not loading or the entries are too vague.
Step 2: Build the Mill 1 staging model
Direct AI to build the Mill 1 staging model with incremental extraction. The model uses dbt's incremental materialization with the is_incremental() macro and a unique_key configuration that maps to your MERGE key from Unit 3.
dbt's incremental materialization is how MERGE works inside the dbt framework. When the model runs for the first time, it creates the table from scratch. On subsequent runs, it only processes records newer than the watermark and uses the unique_key to MERGE them -- inserting new records and updating existing ones.
The key configuration:
{{
config(
materialized='incremental',
unique_key='record_id'
)
}}
Verify what AI generates. Does the model use MERGE (incremental with unique_key), not INSERT? Does it reference the correct watermark column (mill_date)? Does the naming follow the stg_ convention from CLAUDE.md? Does it use the field names from your data dictionary?
Step 3: Build the Mill 2 staging model
Mill 2 is a JSON source with different field names. The staging model maps supplier_name to farmer_name, weight_kg to paddy_weight_kg, moisture_percent to moisture_pct, and so on. Same MERGE logic, same watermark approach, different source schema.
Check the field mapping against materials/field-mapping.md. AI commonly gets column mappings wrong when source schemas differ -- it may map weight_kg to the wrong unified name or miss a column entirely. Verify each mapping.
Step 4: Build the unifying intermediate model
The intermediate model (int_daily_operations) combines both mills' staging data into a single schema. Add a mill_id column to distinguish which mill each record came from.
This is where you decide what to do with advance payments. Mill 2 has records where Kyaw Zin Oo paid a farmer for paddy that has not been delivered yet -- weight_kg and harvest_quality are null. Two options:
- Filter at staging. Remove advance payment records before they reach the intermediate model. Simpler downstream, but Kyaw Zin Oo loses visibility into farmer payment tracking.
- Flag at intermediate. Keep the records but add an is_advance_payment flag. More complex, but preserves data Kyaw Zin Oo needs for farmer relationship management.
The choice depends on what the mart layer needs. Kyaw Zin Oo wants to track farmer deliveries and payments over time. Filtering advance payments removes part of that picture.
Step 5: Run and verify the incremental pipeline
Run dbt build to execute the full pipeline. Then test the incremental behavior:
- Load the initial data. Check row counts in each staging table.
- Load the next day's data. Verify the watermark advanced -- new records appeared without duplicating old ones.
- Simulate a correction: re-load day 3 data from materials/mill1-day3-corrected.csv. Query stg_mill1_daily -- are there duplicates for day 3? If the MERGE key is correct, the corrected values replaced the originals and the row count stayed the same.
Count the rows in int_daily_operations. The total should match the sum of unique records from both mills -- no duplicates from the correction re-load, no missing records from the incremental logic.
Check: Run dbt build. Then simulate a correction: re-load a day's data from Mill 1. Query the staging table -- are there duplicates? If not, the MERGE is working. Count the rows in int_daily_operations and verify it matches expected totals.