#!/usr/bin/env python # -*- coding: utf-8 # Required modules import os import inspect import random import binascii import bcrypt from flask import Flask, request, session, g, redirect, url_for, abort, render_template, flash from functools import wraps # Optionnal modules import psycopg2 from flask_sqlalchemy import SQLAlchemy ######################################################################## # App settings ######################################################################## app = Flask(__name__) # Path to static files app.static_url_path='/static' # Set debug mode to False for production app.debug = True # Various configuration settings belong here (optionnal) app.config.from_pyfile('config.local.py') # Generate a new key: head -n 40 /dev/urandom | md5sum | cut -d' ' -f1 app.secret_key = '9ac80548e3a8d8dfd1aefcd9a3a73473' # Feel free to use SQLAlchemy (or not) db = SQLAlchemy(app) ######################################################################## # Sample user database ######################################################################## class Tetawebapp_users(db.Model): __tablename__ = 'participer_thsf_users' id = db.Column(db.Integer, primary_key=True) mail = db.Column(db.Text, nullable=False) password = db.Column(db.Text, nullable=False) name = db.Column(db.Text, nullable=True) phone = db.Column(db.Text, nullable=True) diet = db.Column(db.Text, nullable=True) is_admin = db.Column(db.Integer, nullable=False, default=0) class Tetawebapp_roles(db.Model): __tablename__ = 'participer_thsf_roles' id = db.Column(db.Integer, primary_key=True) role = db.Column(db.Text, nullable=False) description = db.Column(db.Text, nullable=False) class Tetawebapp_turns(db.Model): __tablename__ = 'participer_thsf_turns' id = db.Column(db.Integer, primary_key=True) role_id = db.Column(db.Integer, db.ForeignKey('participer_thsf_roles.id'), nullable=False) user_id = db.Column(db.Integer, db.ForeignKey('participer_thsf_users.id'), nullable=True) wday = db.Column(db.Enum('Jeudi', 'Vendredi', 'Samedi', 'Dimanche'), nullable=False) start_time = db.Column(db.Time, nullable=False) end_time = db.Column(db.Time, nullable=False) ######################################################################## # Menu and navigation management ######################################################################## def get_menu(page): """ The main menu is a list of lists in the followin format: [unicode caption, {unicode URL endpoint: [unicode route, ...]}, int 0] - The URL end point is the URL where to point to (href) - One of the routes MUST match the route called by request - The int 0 is used to determine which menu entry is actally called. The value MUST be 0.""" menu = [[u'Accueil', {u'/': [u'/']}, 0], [u'Mon compte', {u'/account': [u'/account', u'/account/update']}, 0], [u'Mes tours de staff', {u'/turns': [u'/turns']}, 0], [u'Feuilles de staff', {u'/staff_sheets': [u'/staff_sheet']}, 0], [u'Déconnexion', {u'/logout': [u'/logout']}, 0], ] if session['is_admin']: menu = [[u'Accueil', {u'/': [u'/']}, 0], [u'Tours de staff', {u'/turns': [u'/turns', u'/turn/', u'/turn/new', u'/turn/add', u'/turn/delete/', u'/turn/update/']}, 0], [u'Feuilles de staff', {u'/staff_sheets': [u'/staff_sheet']}, 0], [u'Liste des staffers', {u'/users': [u'/users', u'/account/', u'/account/delete/']}, 0], [u'Déconnexion', {u'/logout': [u'/logout']}, 0], ] #~ print '[+] Page: %s' % page for item in menu: for url in item[1]: for route in item[1][url]: #~ print " [+] Route: %s" %route if route == page: #~ print " [+] Selected page: %s" % page item[2] = 1 return menu # This should never happen return menu def get_navbar(page, selected): """ The horizontal navbar is a list of lists in the followin format: [unicode caption, {unicode URL endpoint: [unicode route, ...]}, int 0] - The URL end point is the URL where to point to (href) - One of the routes MUST match the route called by request - The int 0 is used to de """ navbars = [[u'First article', {u'/articles/1': [u'/articles', u'/articles/']}, 0, 0], [u'Second article', {u'/articles/2': [u'/articles', u'/articles/']}, 0, 0], [u'Third article', {u'/articles/3': [u'/articles', u'/articles/']}, 0, 0] ] navbar = [] for item in navbars: for url in item[1]: if url == selected: item[2] = 1 for route in item[1][url]: if route == page: navbar.append(item) navbar[len(navbar) - 1][3] = 1 return navbar ######################################################################## # Session management ######################################################################## def sync_session(request, session): """ Synchronize cookies with session """ for key in request.cookies: session[key] = request.cookies[key].encode('utf8') def sync_cookies(response, session): """ Synchronize session with cookies """ for key in session: response.set_cookie(key, value=str(session[key])) def check_session(func): """ Check if the session has required token cookie set. If not, redirects to the login page. """ @wraps(func) def check(*args, **kwargs): try: if session['token'] == request.cookies['token'] and len(session['token']) > 0: # User is logged in and identified return func(*args, **kwargs) else: # User is not logged in or session expired session['token'] = '' response = app.make_response(render_template('login_or_register.html', message='')) sync_cookies(response, session) return response except KeyError: # User is not logged in return render_template('login_or_register.html', message='') return check def check_login(login, password): """ Puts the login verification code here """ hashed_password = bcrypt.hashpw(password, bcrypt.gensalt()) stored_hash = Tetawebapp_users.query.filter_by(mail=login).with_entities(Tetawebapp_users.password).first() is_admin = Tetawebapp_users.query.filter_by(mail=login).with_entities(Tetawebapp_users.is_admin).first() if stored_hash: if bcrypt.checkpw(password, stored_hash[0].encode('utf-8')): session['is_admin'] = is_admin[0] return True return False def register_user(login, password, confirm): """ Register new user """ if password != confirm: # Password does not match confirmation print "[+] Password mismatch confirmation" return False check_user = Tetawebapp_users.query.filter_by(mail=login).count() if check_user != 0: # User already exists print "[+] User already exists" return False hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) user = Tetawebapp_users(mail=login.encode('utf8'), password=hashed_password) try: db.session.add(user) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at register_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def update_user(login, password, confirm, name, phone, diet): """ Update user infos with provided data """ if password != confirm: # Password does not match confirmation print "[+] Password mismatch confirmation" return False check_user = Tetawebapp_users.query.filter_by(mail=login).count() if check_user == 0: # User does not exist print "[+] User does not exist" return False user = Tetawebapp_users.query.filter_by(mail=login).first() if len(password) > 0: # User requested password modification hashed_password = bcrypt.hashpw(password, bcrypt.gensalt()) setattr(user, 'password', hashed_password) # Password has been updated if necessary # Now let's update other data setattr(user, 'name', name) setattr(user, 'phone', phone) setattr(user, 'diet', diet) try: db.session.add(user) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at update_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def update_user_by_id(user_id, login, password, confirm, name, phone, diet): """ Update user infos with provided data """ if password != confirm: # Password does not match confirmation print "[+] Password mismatch confirmation" return False check_user = Tetawebapp_users.query.filter_by(id=user_id).count() if check_user == 0: # User does not exist print "[+] User does not exist" return False user = Tetawebapp_users.query.filter_by(id=user_id).first() if len(password) > 0: # User requested password modification hashed_password = bcrypt.hashpw(password, bcrypt.gensalt()) setattr(user, 'password', hashed_password) # Password has been updated if necessary # Now let's update other data setattr(user, 'name', name) setattr(user, 'phone', phone) setattr(user, 'diet', diet) try: db.session.add(user) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at update_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def delete_user(user_id): """ Delete user """ try: Tetawebapp_users.query.filter_by(id=int(user_id)).delete() db.session.commit() return True except ValueError as e: return False except Exception as e: db.session.rollback() print "[+] Error at delete_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False def save_turn(role_id, day, start, end): """ Save a new turn """ turn = Tetawebapp_turns(role_id=role_id.encode('utf-8'), wday=day.encode('utf-8'), start_time=start.encode('utf-8'), end_time=end.encode('utf-8'), ) try: db.session.add(turn) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at save_turn:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def update_turn_by_id(turn_id, role_id, wday, start, end): """ Update turn with provided data """ check_turn = Tetawebapp_turns.query.filter_by(id=turn_id).count() if check_turn == 0: # User does not exist print "[+] User does not exist" return False turn = Tetawebapp_turns.query.filter_by(id=turn_id).first() setattr(turn, 'role_id', role_id) setattr(turn, 'wday', wday) setattr(turn, 'start_time', start) setattr(turn, 'end_time', end) try: db.session.add(turn) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at update_turn:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def drop_turn(turn_id): """ Delete staff turn """ try: Tetawebapp_turns.query.filter_by(id=int(turn_id)).delete() db.session.commit() return True except ValueError as e: print e return False except Exception as e: db.session.rollback() print "[+] Error at drop_turn:" print "------------------------------" print "%s" % e.message print "------------------------------" return False def check_user_info(): """ Check user info and send appropriate message if info are not complete""" message = '' user = Tetawebapp_users.query.filter_by(mail=session['login']).first() name = user.name phone = user.phone diet = user.diet if name == None or phone == None or diet == None or \ len(name) == 0 or len(phone) == 0 or len(diet) == 0: message = "Vos informations personnelles ne sont pas complètement renseignées. N'oubliez pas de remplir votre fiche située dans la section 'Mon compte'" return message.decode('utf-8') def gen_token(): """ Generate a random token to be stored in session and cookie """ token = binascii.hexlify(os.urandom(42)) return token ######################################################################## # Routes: # ------- # Except for the index function, the function name MUST have the same # name than the URL endpoint to make the menu work properly ######################################################################## @app.errorhandler(404) def page_not_found(e): """ 404 not found """ return render_template('error.html'), 404 @app.route("/login", methods=['GET', 'POST']) def login(): """ Login """ try: login = request.form.get('login').encode('utf-8') password = request.form.get('password').encode('utf-8') if check_login(login, password): # Generate and store a token in session session['token'] = gen_token() session['login'] = login # Return user to index page page = '/' menu = get_menu(page) message = check_user_info() response = app.make_response(render_template('index.html', menu=menu, message=message, login=login)) # Push token to cookie sync_cookies(response, session) return response # Credentials are not valid response = app.make_response(render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide")) session['token'] = '' sync_cookies(response, session) return response except AttributeError: return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/register", methods=['GET', 'POST']) def register(): """ Allow self registration """ try: login = request.form.get('login').encode('utf-8') password = request.form.get('password').encode('utf-8') confirm = request.form.get('confirm').encode('utf-8') if register_user(login, password, confirm): # Generate and store a token in session session['token'] = gen_token() session['login'] = login # Return user to index page page = '/' menu = get_menu(page) message = check_user_info() response = app.make_response(render_template('index.html', menu=menu, login=login, message=message)) # Push token to cookie sync_cookies(response, session) return response # Error while registering user message = "Erreur lors de l'enregsitrement: L'utilisateur existe t-il déjà ?".decode('utf-8') response = app.make_response(render_template('login_or_register.html', message=message)) session['token'] = '' sync_cookies(response, session) return response except AttributeError: return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/", methods=['GET', 'POST']) @check_session def index(): """ Index page """ page = str(request.url_rule) menu = get_menu(page) message = check_user_info() return render_template('index.html', menu=menu, message=message, login=session['login']) @app.route("/account", methods=['GET', 'POST']) @check_session def account(): """ Account page """ page = str(request.url_rule) menu = get_menu(page) user = Tetawebapp_users.query.filter_by(mail=session['login']).first() mail = '' if user.mail == None else user.mail name = '' if user.name == None else user.name phone = '' if user.phone == None else user.phone diet = '' if user.diet == None else user.diet message = check_user_info() return render_template('account.html', menu=menu, mail=mail, name=name, phone=phone, diet=diet, message=message) @app.route("/account/update", methods=['GET', 'POST']) @check_session def update_account(): """ Update current account """ try: page = str(request.url_rule) menu = get_menu(page) login = session['login'] password = request.form.get('password').encode('utf-8') confirm = request.form.get('confirm').encode('utf-8') name = request.form.get('name').encode('utf-8') phone = request.form.get('phone').encode('utf-8') diet = request.form.get('diet').encode('utf-8') if update_user(login, password, confirm, name, phone, diet): message = check_user_info() else: message = "Erreur lors de l'enregistrement des données." return render_template('account.html', menu=menu, mail=login.decode('utf-8'), name=name.decode('utf-8'), phone=phone.decode('utf-8'), diet=diet.decode('utf-8'), message=message) except AttributeError: return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/logout", methods=['GET', 'POST']) @check_session def logout(): """ Logout user """ # Remove session token session['token'] = None session['login'] = None session['is_admin'] = 0 # Return user to index page response = app.make_response(render_template('login_or_register.html', message='')) # Push token to cookie sync_cookies(response, session) return response ######################################################################## # Admin zone ######################################################################## @app.route("/users", methods=['GET', 'POST']) @check_session def list_users(): """ Users list """ page = str(request.url_rule) menu = get_menu(page) message = check_user_info() staffers = Tetawebapp_users.query.filter_by(is_admin=0).order_by(Tetawebapp_users.name).all() return render_template('list_users.html', menu=menu, staffers=staffers, message=message) @app.route("/account/", methods=['GET', 'POST']) @check_session def account_by_id(ID): """ Arcticles page """ try: if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) message = "ID de l'utilisateur non conforme" staffers = Tetawebapp_users.query.filter_by(is_admin=0).order_by(Tetawebapp_users.name).all() user_id = int(ID.encode('utf-8')) user = Tetawebapp_users.query.filter_by(id=user_id).first() return render_template('account_by_id.html', menu=menu, user=user) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except ValueError: # ID is not an integer return render_template('list_users.html', menu=menu, staffers=staffers, message=message) @app.route("/account/update/", methods=['GET', 'POST']) @check_session def update_account_by_id(ID): """ Update given account """ try: if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) login = session['login'] password = request.form.get('password').encode('utf-8') confirm = request.form.get('confirm').encode('utf-8') name = request.form.get('name').encode('utf-8') phone = request.form.get('phone').encode('utf-8') diet = request.form.get('diet').encode('utf-8') message = "ID de l'utilisateur non conforme" staffers = Tetawebapp_users.query.filter_by(is_admin=0).order_by(Tetawebapp_users.name).all() user_id = int(ID.encode('utf-8')) if update_user_by_id(user_id, login, password, confirm, name, phone, diet): user = Tetawebapp_users.query.filter_by(id=ID).first() message = check_user_info() else: message = "Erreur lors de l'enregistrement des données." return render_template('account_by_id.html', menu=menu, user=user,message=message) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except ValueError: # ID is not an integer return render_template('list_users.html', menu=menu, staffers=staffers, message=message) @app.route("/account/delete/", methods=['GET', 'POST']) @check_session def delete_account(ID): """ Delete given account """ try: if session['is_admin']: message = "Erreur lors de la suppression.".decode('utf-8') page = str(request.url_rule) menu = get_menu(page) staffers = Tetawebapp_users.query.filter_by(is_admin=0).order_by(Tetawebapp_users.name).all() user_id = int(ID.encode('utf-8')) if delete_user(user_id): message = '' staffers = Tetawebapp_users.query.filter_by(is_admin=0).order_by(Tetawebapp_users.name).all() return render_template('list_users.html', menu=menu, staffers=staffers, message=message) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except ValueError: # ID is not an integer return render_template('list_users.html', menu=menu, staffers=staffers, message=message) @app.route("/turns", methods=['GET', 'POST']) @check_session def list_turn(): """ List staff turns """ try: if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() message = '' return render_template('list_turns.html', menu=menu, page=page, turns=turns, message=message) except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/turn/new", methods=['GET', 'POST']) @check_session def new_turn(): """ New turn form """ try: if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) roles = Tetawebapp_roles.query.order_by(Tetawebapp_roles.id).all() days = ['Jeudi', 'Vendredi', 'Samedi', 'Dimanche'] return render_template('new_turn.html', menu=menu, page=page, roles=roles, days=days) except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/turn/add", methods=['GET', 'POST']) @check_session def add_turn(): """ Add staff turn """ try: if session['is_admin']: role_id = request.form.get('role_id').encode('utf-8') day = request.form.get('day').encode('utf-8') start = request.form.get('start').encode('utf-8') end = request.form.get('end').encode('utf-8') page = str(request.url_rule) menu = get_menu(page) turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() message = "Erreur lors de l'enregistrement.".decode('utf-8') if save_turn(role_id, day, start, end): turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() message='' return render_template('list_turns.html', menu=menu, page=page, turns=turns, message=message) # Error while saving turn roles = Tetawebapp_roles.query.order_by(Tetawebapp_roles.id).all() days = ['Jeudi', 'Vendredi', 'Samedi', 'Dimanche'] return render_template('new_turn.html', menu=menu, page=page, roles=roles, days=days, message=message) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError as e: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/turn/", methods=['GET', 'POST']) @check_session def turn_by_id(ID): try: if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) roles = Tetawebapp_roles.query.order_by(Tetawebapp_roles.id).all() days = ['Jeudi', 'Vendredi', 'Samedi', 'Dimanche'] message = 'ID du tour de staff non conforme' turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() turn_id = int(ID.encode('utf-8')) turn = Tetawebapp_turns.query.filter_by(id=ID).first() return render_template('turn_by_id.html', menu=menu, page=page, turn=turn, roles=roles, days=days) except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except ValueError: # ID is not an integer return render_template('list_turns.html', menu=menu, page=page, turns=turns, message=message) @app.route("/turn/update/", methods=['GET', 'POST']) @check_session def update_turn(ID): """ Update given staff turn """ try: role_id = request.form.get('role_id').encode('utf-8') day = request.form.get('day').encode('utf-8') start = request.form.get('start').encode('utf-8') end = request.form.get('end').encode('utf-8') if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() message = "Erreur lors de l'enregistrement.".decode('utf-8') turn_id = int(ID.encode('utf-8')) if update_turn_by_id(turn_id, role_id, day, start, end): turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() message = '' return render_template('list_turns.html', menu=menu, page=page, turns=turns, message=message) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError as e: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except ValueError: # ID is not an integer return render_template('list_turns.html', menu=menu, page=page, turns=turns, message=message) @app.route("/turn/delete/", methods=['GET', 'POST']) @check_session def delete_turn(ID): """ Delete given staff turn """ try: if session['is_admin']: message = 'Erreur lors de la suppression.' page = str(request.url_rule) menu = get_menu(page) turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() turn_id = int(ID.encode('utf-8')) if drop_turn(turn_id): message = '' turns = Tetawebapp_turns.query.join(Tetawebapp_roles, Tetawebapp_turns.role_id==Tetawebapp_roles.id).add_columns(Tetawebapp_roles.role).order_by(Tetawebapp_turns.role_id).all() return render_template('list_turns.html', menu=menu, turns=turns, message=message) return render_template('list_turns.html', menu=menu, turns=turns, message=message) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except ValueError: # ID is not an integer return render_template('list_turns.html', menu=menu, page=page, turns=turns, message=message) ######################################################################## # Main ######################################################################## if __name__ == '__main__': app.run(host='0.0.0.0')