linuxmuster-voucher/script/app.py
2025-05-06 23:26:57 +02:00

216 lines
7.5 KiB
Python

import os
from authlib.integrations.flask_client import OAuth
from flask import Flask, jsonify, request, render_template, redirect, url_for, session, send_from_directory
from werkzeug.middleware.proxy_fix import ProxyFix
from pyunifi.controller import Controller
import uuid
import json
import qrcode
import base64
import io
import datetime
def str_to_bool(value):
"""
Konvertiert einen String in einen booleschen Wert.
"""
return value.lower() in ('true', '1', 'yes')
UNIFI_HOST = os.getenv('UNIFI_HOST', '')
UNIFI_USERNAME = os.getenv('UNIFI_USERNAME', 'admin')
UNIFI_PASSWORD = os.getenv('UNIFI_PASSWORD', '')
UNIFI_PORT = int(os.getenv('UNIFI_PORT', 8443))
UNIFI_SSL_VERIFY = str_to_bool(os.getenv('UNIFI_SSL_VERIFY', 'True'))
UNIFI_SITE_ID = os.getenv('UNIFI_SITE_ID', '')
DEBUG = str_to_bool(os.getenv('DEBUG', 'False'))
LOGO_FILE = os.getenv('LOGO_FILE', '/static/logo.jpg')
WIFI_ICON = os.getenv('WIFI_ICON', '/static/wifi-icon.png')
if DEBUG:
print("DEBUG mode is enabled.")
print(f"UNIFI_HOST: {UNIFI_HOST}")
print(f"UNIFI_USERNAME: {UNIFI_USERNAME}")
print(f"UNIFI_PASSWORD: {UNIFI_PASSWORD}")
print(f"UNIFI_PORT: {UNIFI_PORT}")
print(f"UNIFI_SSL_VERIFY: {UNIFI_SSL_VERIFY}")
print(f"UNIFI_SITE_ID: {UNIFI_SITE_ID}")
print(f"LOGO_FILE: {LOGO_FILE}")
print(f"WIFI_ICON: {WIFI_ICON}")
LOGLEVEL = os.getenv('LOGLEVEL', 'INFO')
def log(message, level='INFO'):
"""
Log a message with the specified log level.
"""
if level == 'DEBUG' and not DEBUG:
return
if level == 'INFO' and LOGLEVEL not in ['DEBUG', 'INFO']:
return
if level == 'WARNING' and LOGLEVEL not in ['DEBUG', 'INFO', 'WARNING']:
return
if level == 'ERROR' and LOGLEVEL not in ['DEBUG', 'INFO', 'WARNING', 'ERROR']:
return
print(f"{level}: {message}")
def check_site_id(site_id):
"""
Check if the given site ID exists in the controller and assign the right one if not.
"""
log(f"Checking site ID: {site_id}", 'DEBUG')
c = Controller(UNIFI_HOST, UNIFI_USERNAME, UNIFI_PASSWORD, port=UNIFI_PORT, ssl_verify=UNIFI_SSL_VERIFY, site_id=UNIFI_SITE_ID)
sites = c.get_sites()
for site in sites:
if site['name'] == site_id:
return site_id
if len(sites) == 1:
print("\n" * 3 , "#" * 60)
print(f"Since there is only one site in your system I can set UNIFI_SITE_ID to {sites[0]['name']}")
print(f"To suppress this message, set environment-variable UNIFI_SITE_ID to {sites[0]['name']}")
print("#" * 60, "\n" * 3)
return sites[0]['name']
else:
print("\n" * 3, "#" * 60)
print(f"Multiple sites found. Please specify a valid site ID.")
print(f"Available site IDs: {', '.join([site['name'] for site in sites])}")
print(f"Setting UNIFI_SITE_ID to {sites[0]['name']}")
print(f"To suppress this message, set environment-variable UNIFI_SITE_ID to the desired site ID")
print("#" * 60, "\n" * 3)
return sites[0]['name']
def get_voucher_list(controller):
"""
Get the list of vouchers from the controller.
"""
vouchers = controller.list_vouchers()
return vouchers
def api_create_voucher(c, note="TEST", time=45, up_bandwidth=5000, down_bandwidth=10000, byte_quota=0):
"""
Create a voucher with the specified name and number of vouchers.
create_voucher(Anzahl Voucher, Anzahl Nutzer (0=ohne Limit), Zeit in Minuten, Upstream Bandwidth, Downstream Bandwidth, Byte Quota, Notiz)
"""
voucher = c.create_voucher(1, 0, time, up_bandwidth=up_bandwidth, down_bandwidth=down_bandwidth, byte_quota=byte_quota, note=note)
code = voucher[0].get('code')
log(f"Created voucher with code: {code}", 'INFO')
return code
# QR-Code für das WLAN generieren:
ssid = os.getenv('WLAN_SSID', 'TANNE')
password = os.getenv('WLAN_WPA2', 'hdndwlanar')
qr_data = f"WIFI:T:WPA;S:{ssid};P:{password};;"
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=0,
)
qr.add_data(qr_data)
qr.make(fit=True)
img = qr.make_image(fill_color="#000000", back_color="transparent")
buffer = io.BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
app = Flask(__name__)
# Keycloak-Konfiguration
app.secret_key = 'paiqwrqwrsfuhkcp8sdfisuhckjhyli' # ohne den geht Keycloak nicht
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
app.config['OIDC_CLIENT_ID'] = os.getenv('OIDC_CLIENT_ID')
app.config['OIDC_CLIENT_SECRET'] = os.getenv('OIDC_CLIENT_SECRET')
app.config['OIDC_ISSUER'] = os.getenv('OIDC_ISSUER')
app.config['OIDC_REDIRECT_URI'] = os.getenv('OIDC_REDIRECT_URI')
app.config['SERVER_NAME'] = os.getenv('OIDC_REDIRECT_URI')
oauth = OAuth(app)
oauth.register(
name="keycloak",
client_id=app.config["OIDC_CLIENT_ID"],
client_secret=app.config["OIDC_CLIENT_SECRET"],
server_metadata_url=f"{app.config['OIDC_ISSUER']}/.well-known/openid-configuration",
client_kwargs={"scope": "openid profile email"},
)
# b64encode-Filter registrieren
@app.template_filter('b64encode')
def b64encode_filter(data):
"""
Base64-Encode für Jinja2-Templates.
"""
if isinstance(data, bytes):
return base64.b64encode(data).decode('utf-8')
elif hasattr(data, 'getvalue'): # Prüfen, ob es ein BytesIO-Objekt ist
return base64.b64encode(data.getvalue()).decode('utf-8')
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
@app.route("/")
def home():
if "user" not in session:
return redirect(url_for("login"))
return render_template("index.html", logo=LOGO_FILE, name=session['user']['given_name'])
@app.route("/voucher/<int:minutes>")
def create_voucher(minutes):
if "user" not in session:
return redirect(url_for("login"))
try:
c = Controller(UNIFI_HOST, UNIFI_USERNAME, UNIFI_PASSWORD, port=UNIFI_PORT, ssl_verify=UNIFI_SSL_VERIFY, site_id=check_site_id(UNIFI_SITE_ID))
code = api_create_voucher(c, time=minutes, note=f"{session['user']['preferred_username']} - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# code aufbereiten: 5 Zeichen Bindestrich, 5 Zeichen Bindestrich, 5 Zeichen Bindestrich, 5 Zeichen
code = f"{code[:5]}-{code[5:10]}"
except Exception as e:
log(f"Error creating voucher: {e}", 'ERROR')
return jsonify({"error": str(e)}), 500
return render_template("voucher.html", code=code, minutes=minutes, qr_code=buffer, wifi_icon=WIFI_ICON)
@app.route('/login')
def login():
# Generiere einen eindeutigen nonce-Wert
nonce = str(uuid.uuid4())
session['nonce'] = nonce # Speichere den nonce in der Session
redirect_uri = url_for('authorize', _external=True)
return oauth.keycloak.authorize_redirect(redirect_uri, nonce=nonce)
@app.route('/authorize')
def authorize():
token = oauth.keycloak.authorize_access_token()
print(token)
nonce = session.pop('nonce', None) # Hole den nonce aus der Session
if not nonce:
return "Fehler: nonce fehlt in der Session", 400
print(nonce)
user_info = oauth.keycloak.parse_id_token(token, nonce=nonce)
session['user'] = user_info
print(user_info)
print(session['user'])
return redirect(url_for('home'))
# Logout-Route
@app.route("/logout")
def logout():
session.pop("user", None)
return redirect(app.config["OIDC_ISSUER"] + "/protocol/openid-connect/logout?redirect_uri=" + url_for("home", _external=True))
# Main script
if __name__ == "__main__":
app.run(host="0.0.0.0", port=42425, debug=True)