Learn by Directing AI
All materials

app.py

pyapp.py
from flask import Flask, request, jsonify, render_template_string
import sqlite3
import os

app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', 'dev-key')

DB_PATH = os.environ.get('DB_PATH', './data/members.db')


def get_db():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


def init_db():
    os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
    conn = get_db()
    conn.executescript('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            username TEXT UNIQUE NOT NULL,
            password TEXT NOT NULL,
            role TEXT DEFAULT 'member',
            active INTEGER DEFAULT 1,
            last_login TEXT
        );

        CREATE TABLE IF NOT EXISTS farmers (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            cedula TEXT NOT NULL,
            bank_account TEXT,
            phone TEXT,
            farm_location TEXT,
            altitude_m INTEGER,
            hectares REAL
        );

        CREATE TABLE IF NOT EXISTS harvests (
            id INTEGER PRIMARY KEY,
            farmer_id INTEGER,
            date TEXT NOT NULL,
            quantity_kg REAL NOT NULL,
            quality_grade TEXT,
            notes TEXT,
            FOREIGN KEY (farmer_id) REFERENCES farmers(id)
        );

        CREATE TABLE IF NOT EXISTS payments (
            id INTEGER PRIMARY KEY,
            farmer_id INTEGER,
            amount REAL NOT NULL,
            currency TEXT DEFAULT 'USD',
            date TEXT,
            reference TEXT,
            FOREIGN KEY (farmer_id) REFERENCES farmers(id)
        );
    ''')

    # Seed users including stale accounts
    cursor = conn.execute('SELECT COUNT(*) FROM users')
    if cursor.fetchone()[0] == 0:
        users = [
            ('andres', 'nubes2024', 'admin', 1),
            ('maria_lopez', 'harvest123', 'staff', 1),
            ('carlos_mendez', 'coop2023', 'staff', 1),
            # Stale accounts -- former staff who left
            ('julian_torres', 'temp2022', 'staff', 1),
            ('rosa_garcia', 'rosa2023', 'staff', 1),
        ]
        conn.executemany(
            'INSERT INTO users (username, password, role, active) VALUES (?, ?, ?, ?)',
            users
        )

        # Seed farmers
        farmers = [
            ('Pedro Sanchez', 'V-12345678', 'VE-0102-1234-5678', '+58-276-555-0101', 'El Cobre, Tachira', 1650, 3.5),
            ('Ana Maria Duque', 'V-23456789', 'VE-0102-2345-6789', '+58-276-555-0102', 'Rubio, Tachira', 1800, 2.0),
            ('Luis Fernando Contreras', 'V-34567890', 'VE-0102-3456-7890', '+58-276-555-0103', 'Delicias, Tachira', 1720, 4.0),
            ('Carmen Rosa Pena', 'V-45678901', 'VE-0102-4567-8901', '+58-276-555-0104', 'La Grita, Tachira', 1900, 1.5),
            ('Jose Manuel Ramirez', 'V-56789012', 'VE-0102-5678-9012', '+58-276-555-0105', 'Pregonero, Tachira', 1550, 5.0),
        ]
        conn.executemany(
            'INSERT INTO farmers (name, cedula, bank_account, phone, farm_location, altitude_m, hectares) VALUES (?, ?, ?, ?, ?, ?, ?)',
            farmers
        )

        conn.commit()
    conn.close()


HARVEST_FORM = '''
<!DOCTYPE html>
<html>
<head><title>Member Portal - Harvest Report</title></head>
<body>
<h1>Cooperativa Nubes del Tachira</h1>
<h2>Report Harvest</h2>
<form method="POST" action="/harvest">
    <label>Farmer ID: <input type="number" name="farmer_id" required></label><br><br>
    <label>Quantity (kg): <input type="number" step="0.1" name="quantity_kg" required></label><br><br>
    <label>Quality Grade: <input type="text" name="quality_grade"></label><br><br>
    <label>Notes: <textarea name="notes"></textarea></label><br><br>
    <button type="submit">Submit Report</button>
</form>
<div id="results">{{ results|safe }}</div>
</body>
</html>
'''


@app.route('/')
def index():
    return render_template_string(HARVEST_FORM, results='')


@app.route('/harvest', methods=['POST'])
def submit_harvest():
    farmer_id = request.form.get('farmer_id')
    quantity = request.form.get('quantity_kg')
    grade = request.form.get('quality_grade', '')
    # VULNERABLE: XSS -- notes field rendered without escaping
    notes = request.form.get('notes', '')

    conn = get_db()
    conn.execute(
        'INSERT INTO harvests (farmer_id, date, quantity_kg, quality_grade, notes) VALUES (?, date("now"), ?, ?, ?)',
        (farmer_id, quantity, grade, notes)
    )
    conn.commit()

    # Render the notes directly without escaping -- XSS vulnerability
    result_html = f'<h3>Harvest recorded</h3><p>Farmer: {farmer_id}</p><p>Quantity: {quantity} kg</p><p>Notes: {notes}</p>'
    return render_template_string(HARVEST_FORM, results=result_html)


@app.route('/api/farmers', methods=['GET'])
def list_farmers():
    conn = get_db()
    farmers = conn.execute('SELECT * FROM farmers').fetchall()
    return jsonify([dict(f) for f in farmers])


@app.route('/api/payments/<int:farmer_id>', methods=['GET'])
def farmer_payments(farmer_id):
    conn = get_db()
    payments = conn.execute(
        'SELECT * FROM payments WHERE farmer_id = ?', (farmer_id,)
    ).fetchall()
    farmer = conn.execute(
        'SELECT name, cedula, bank_account FROM farmers WHERE id = ?', (farmer_id,)
    ).fetchone()
    return jsonify({
        'farmer': dict(farmer) if farmer else None,
        'payments': [dict(p) for p in payments]
    })


@app.route('/login', methods=['POST'])
def login():
    username = request.form.get('username', '')
    password = request.form.get('password', '')
    conn = get_db()
    user = conn.execute(
        'SELECT * FROM users WHERE username = ? AND password = ?',
        (username, password)
    ).fetchone()
    if user:
        return jsonify({'status': 'authenticated', 'role': user['role']})
    return jsonify({'status': 'failed'}), 401


if __name__ == '__main__':
    init_db()
    app.run(host='0.0.0.0', port=5000, debug=False)