All materials
app.py
pyapp.py
from flask import Flask, request, jsonify
import random
import time
app = Flask(__name__)
# In-memory storage for sensor readings
readings = []
@app.route('/')
def index():
return jsonify({
'service': 'Fermentation Monitoring API',
'version': '1.0',
'endpoints': ['/readings', '/readings/latest', '/health'],
'note': 'No authentication required -- sensor devices connect directly'
})
# No authentication -- sensors connect directly
@app.route('/readings', methods=['POST'])
def add_reading():
data = request.get_json() or {}
reading = {
'tank_id': data.get('tank_id', 'unknown'),
'temperature_c': data.get('temperature_c', round(random.uniform(18.0, 24.0), 1)),
'humidity_pct': data.get('humidity_pct', round(random.uniform(60.0, 85.0), 1)),
'timestamp': data.get('timestamp', time.strftime('%Y-%m-%dT%H:%M:%S')),
'ph': data.get('ph', round(random.uniform(4.0, 5.5), 2))
}
readings.append(reading)
return jsonify({'status': 'recorded', 'reading': reading}), 201
@app.route('/readings', methods=['GET'])
def get_readings():
tank = request.args.get('tank_id')
if tank:
filtered = [r for r in readings if r['tank_id'] == tank]
return jsonify(filtered)
return jsonify(readings)
@app.route('/readings/latest', methods=['GET'])
def latest_readings():
if not readings:
return jsonify({'message': 'No readings recorded yet'})
latest = {}
for r in readings:
latest[r['tank_id']] = r
return jsonify(list(latest.values()))
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'ok', 'readings_count': len(readings)})
# Generate some initial readings
def seed_readings():
tanks = ['tank-01', 'tank-02', 'tank-03']
for tank in tanks:
for i in range(5):
readings.append({
'tank_id': tank,
'temperature_c': round(random.uniform(19.0, 23.0), 1),
'humidity_pct': round(random.uniform(65.0, 80.0), 1),
'timestamp': f'2024-02-{15+i}T{6+i}:00:00',
'ph': round(random.uniform(4.2, 5.0), 2)
})
if __name__ == '__main__':
seed_readings()
app.run(host='0.0.0.0', port=8080, debug=False)