Learn by Directing AI
Unit 2

Set up the project and extract the sources

Step 1: Set up the project

Open a terminal and start Claude Code. Paste this prompt:

Create the folder ~/dev/data-engineering/p4. Download the project materials from https://learnbydirectingai.dev/materials/dataeng/p4/materials.zip and extract them into that folder. Read CLAUDE.md -- it's the project governance file.

Claude creates the directory, downloads the materials, extracts everything, and reads the governance file. When it finishes, the project workspace is ready.

Step 2: Tour the project structure

Ask Claude to list the project directory structure. You should see:

  • materials/ -- the four CSV datasets (forestry, sawmill, customs, tag-batch mapping), the sample files from Unit 1, the pipeline spec template, the verification checklist, and the generation script
  • materials/dbt-scaffold/ -- a pre-configured dbt project with dbt_project.yml, profiles.yml, and empty model directories (staging/, intermediate/, marts/)
  • materials/dagster-scaffold/ -- a Dagster project skeleton with workspace.yaml, pyproject.toml, definitions.py, and an empty assets/ module

Open materials/CLAUDE.md. This is the project governance file. It contains the data dictionary for all four sources, naming conventions for dbt models (stg_, int_, fct_/dim_), a ticket backlog (T1 through T5), verification targets, and the commit convention. Every time you start a new Claude Code session on this project, the first thing Claude reads is this file.

Open materials/pipeline-spec-template.md. Sections 1 through 3 are filled in -- the pipeline overview, data source descriptions, and requirements. Sections 4 and 5 are empty. You will fill those after profiling the data and designing the schema.

Open materials/verification-checklist.md. This lists the row counts, match rates, chain of custody statistics, and yield ranges you will check your pipeline against when the work is done. Keep this file accessible -- you will return to it at every major checkpoint.

Step 3: Profile the forestry dataset

Direct Claude to profile materials/forestry-data.csv. Ask for column types, row counts, null patterns, and value distributions for key columns.

Profile materials/forestry-data.csv. Report column types, row count, null counts per column, and value distributions for concession_id, species, and harvest_team. Show the range of volume_m3 and the date range for harvest_date.

The full dataset has 500 rows across four concessions (C1 through C4). Each row is one harvested log. The log_tag column uses four-digit tags with leading zeros -- the same format you saw in the samples. Confirm the row count matches the verification checklist.

Compare what you see against the 12-row sample from Unit 1. The sample showed you the structure. The full dataset shows you the distributions -- which concessions have more logs, which species are most common, whether there are any nulls or unexpected values.

Step 4: Profile the sawmill dataset

Profile materials/sawmill-data.csv with the same approach.

Profile materials/sawmill-data.csv. Report column types, row count, null counts per column, and value distributions for species and grade. Show the range of waste_percentage and sawn_timber_out_m3.

The sawmill dataset has 320 rows. Each row is one processing batch identified by a batch_number in the format SB-2024-NNN. There is no forestry log tag anywhere in this data. The sawmill system has its own identification scheme.

Note the waste_percentage column. This will matter when you calculate yield in the intermediate layer.

Step 5: Profile the tag-to-batch mapping

This is the most important profile. Direct Claude to examine materials/tag-batch-mapping.csv with attention to inconsistencies.

Profile materials/tag-batch-mapping.csv. Report row count, column types, null counts. For the forestry_log_tag column: show how many tags have leading zeros, how many don't, how many have trailing whitespace. Check for any tags that don't match the expected four-digit format.

The mapping file has 480 rows. This is the digitized logbook from the sawmill gate -- workers recorded which forestry log tags went into which sawmill batch.

The inconsistencies are real. Some tags have leading zeros ("0247"), some don't ("247"), some have trailing whitespace. Some entries use a format that doesn't match either the forestry system or the sawmill system. These are not random errors -- they are the result of manual data entry from a paper logbook.

Every inconsistency here is a potential chain of custody gap. A tag that can't be matched means a log that can't be traced from forest to export. The cleaning logic you build later needs to handle these, and the records that still can't be matched after cleaning need to go somewhere visible.

Step 6: Profile the customs dataset

Profile materials/customs-data.csv.

Profile materials/customs-data.csv. Report row count, column types, null counts, and the distribution of flegt_status and destination_country. Check the batch_numbers column format.

The customs dataset has 180 rows. Each row is one export shipment. The batch_numbers column contains comma-separated sawmill batch numbers -- the same SB-2024-NNN format from the sawmill system. This connection is clean. No format mismatches, no inconsistencies.

The chain is now concrete: forestry tags -> (messy mapping) -> sawmill batches -> (clean reference) -> export shipments. The first link requires cleaning and error handling. The second link just works.

Step 7: Understand error quarantine

Before writing any extraction code, understand the design pattern you will use for records that can't be matched.

Error quarantine is a design decision: records that can't be processed go to an explicit error table instead of being silently dropped. The quarantine table stores the original record, the reason it failed, and when it was processed. Nothing is lost. Nothing is hidden.

This is different from what AI will typically generate. AI commonly writes error handling that swallows problems -- catching exceptions and continuing, or filtering out records that don't match a join condition. The output looks clean, but records are missing and there is no trace of what happened. When you direct Claude to write extraction and loading code, verify that it routes unresolvable records to a quarantine table rather than dropping them silently.

Step 8: Write extraction scripts

Direct Claude to write Python extraction scripts that load each CSV into DuckDB. Each extraction should capture metadata: the source name, extraction timestamp, row count, and schema version.

Write a Python script that extracts all four CSV sources (forestry-data.csv, sawmill-data.csv, customs-data.csv, tag-batch-mapping.csv) into a DuckDB database. For each source, record extraction metadata in a separate table: source_name, extraction_timestamp, row_count, schema_version. For the tag-batch-mapping source, implement error quarantine: records where the forestry_log_tag can't be cleaned to a valid format go to an error_quarantine table with the original record, the reason, and the timestamp.

Review what Claude produces. Check three things:

  1. Per-source metadata. Each source gets its own metadata row with a timestamp and row count. Not one pipeline-level "extraction complete" entry -- four separate entries.
  2. Error quarantine for the mapping. Records with tags that can't be resolved after cleaning (trimming whitespace, normalizing leading zeros) go to error_quarantine, not to /dev/null. Check that the quarantine table includes the reason.
  3. No silent drops. Look for try/except: continue or WHERE clauses that filter out problematic records without routing them anywhere. AI commonly generates clean-looking code that silently loses records.

Step 9: Load and verify

Run the extraction script. Then verify the results by querying DuckDB directly.

Query each source table's row count:

SELECT 'forestry' AS source, COUNT(*) AS rows FROM forestry
UNION ALL SELECT 'sawmill', COUNT(*) FROM sawmill
UNION ALL SELECT 'customs', COUNT(*) FROM customs
UNION ALL SELECT 'tag_batch_mapping', COUNT(*) FROM tag_batch_mapping;

Expected: forestry 500, sawmill 320, customs 180, tag_batch_mapping 480 (or close, after quarantined records are removed from the main table).

Query the error quarantine table:

SELECT * FROM error_quarantine LIMIT 10;

This table should not be empty. Real records end up here -- tags that couldn't be matched after cleaning. Each row represents a chain of custody gap: a log that can't be traced from forest to export. The quarantine table is information, not an error condition.

Query the extraction metadata:

SELECT * FROM extraction_metadata;

Four rows, one per source. Each with a timestamp and a row count. This is the evidence trail that makes debugging possible later. If the forestry extraction returns 0 rows next Tuesday, the metadata table shows exactly when that happened.

The Dagster scaffold is in the project directory but you have not used it yet. You will wire these extraction scripts into Dagster assets in Unit 5. For now, the pipeline runs manually and all four sources are loaded with metadata and quarantine in place.


✓ Check

Check: Query each source table's row count. Query the error quarantine table -- it should contain records. Query the extraction metadata table -- it should show timestamps and counts per source.