Learn by Directing AI
All materials

app.py

pyapp.py
from flask import Flask, request, render_template, redirect, url_for, session, g
import sqlite3
import os

app = Flask(__name__)
app.secret_key = 'rsn-ehr-secret-key-2024'

DATABASE = '/app/ehr.db'


def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(DATABASE)
        g.db.row_factory = sqlite3.Row
    return g.db


@app.teardown_appcontext
def close_db(exception):
    db = g.pop('db', None)
    if db is not None:
        db.close()


@app.route('/')
def index():
    if 'user' not in session:
        return redirect(url_for('login'))
    return redirect(url_for('patients'))


@app.route('/login', methods=['GET', 'POST'])
def login():
    error = None
    if request.method == 'POST':
        username = request.form.get('username', '')
        password = request.form.get('password', '')
        db = get_db()
        user = db.execute(
            'SELECT * FROM users WHERE username = ? AND password = ?',
            (username, password)
        ).fetchone()
        if user:
            session['user'] = username
            session['role'] = user['role']
            return redirect(url_for('patients'))
        error = 'Invalid credentials'
    return render_template('index.html', error=error)


@app.route('/patients')
def patients():
    if 'user' not in session:
        return redirect(url_for('login'))
    db = get_db()
    all_patients = db.execute(
        'SELECT id, name, clinic, diagnosis FROM patients ORDER BY name'
    ).fetchall()
    return render_template('patients.html', patients=all_patients)


@app.route('/search')
def search():
    if 'user' not in session:
        return redirect(url_for('login'))
    q = request.args.get('q', '')
    db = get_db()
    # VULNERABLE: SQL injection via string concatenation
    query = "SELECT id, name, clinic, diagnosis FROM patients WHERE name LIKE '%" + q + "%' OR diagnosis LIKE '%" + q + "%'"
    try:
        results = db.execute(query).fetchall()
    except Exception as e:
        results = []
    return render_template('patients.html', patients=results, query=q)


@app.route('/records/<int:patient_id>')
def records(patient_id):
    if 'user' not in session:
        return redirect(url_for('login'))
    db = get_db()
    patient = db.execute(
        'SELECT * FROM patients WHERE id = ?', (patient_id,)
    ).fetchone()
    notes = db.execute(
        'SELECT * FROM notes WHERE patient_id = ? ORDER BY created_at DESC',
        (patient_id,)
    ).fetchall()
    return render_template('record.html', patient=patient, notes=notes)


@app.route('/records/<int:patient_id>/add_note', methods=['POST'])
def add_note(patient_id):
    if 'user' not in session:
        return redirect(url_for('login'))
    content = request.form.get('note', '')
    db = get_db()
    db.execute(
        'INSERT INTO notes (patient_id, author, content, created_at) VALUES (?, ?, ?, datetime("now"))',
        (patient_id, session['user'], content)
    )
    db.commit()
    return redirect(url_for('records', patient_id=patient_id))


@app.route('/logout')
def logout():
    session.clear()
    return redirect(url_for('login'))


if __name__ == '__main__':
    if not os.path.exists(DATABASE):
        import init_db
        init_db.initialize()
    app.run(host='0.0.0.0', port=5000, debug=False)