#!/usr/bin/env python # -*- coding: utf-8 # Required modules import os import inspect import random import binascii import bcrypt from flask import Flask, request, session, g, redirect, url_for, abort, render_template, flash from functools import wraps # Optionnal modules import psycopg2 from flask_sqlalchemy import SQLAlchemy ######################################################################## # App settings ######################################################################## app = Flask(__name__) # Path to static files app.static_url_path='/static' # Set debug mode to False for production app.debug = True # Various configuration settings belong here (optionnal) app.config.from_pyfile('config.local.py') # Generate a new key: head -n 40 /dev/urandom | md5sum | cut -d' ' -f1 app.secret_key = '9ac80548e3a8d8dfd1aefcd9a3a73473' # Feel free to use SQLAlchemy (or not) db = SQLAlchemy(app) ######################################################################## # Sample user database ######################################################################## class Tetawebapp_users(db.Model): __tablename__ = 'participer_thsf_users' id = db.Column(db.Integer, primary_key=True) mail = db.Column(db.Text, nullable=False) password = db.Column(db.Text, nullable=False) name = db.Column(db.Text, nullable=True) phone = db.Column(db.Text, nullable=True) diet = db.Column(db.Text, nullable=True) is_admin = db.Column(db.Integer, nullable=False, default=0) class Tetawebapp_roles(db.Model): __tablename__ = 'participer_thsf_roles' id = db.Column(db.Integer, primary_key=True) role = db.Column(db.Text, nullable=False) description = db.Column(db.Text, nullable=False) class Tetawebapp_turns(db.Model): __tablename__ = 'participer_thsf_turns' id = db.Column(db.Integer, primary_key=True) role_id = db.Column(db.Integer, db.ForeignKey('participer_thsf_roles.id'), nullable=False) user_id = db.Column(db.Integer, db.ForeignKey('participer_thsf_users.id'), nullable=True) wday = db.Column(db.Enum('J', 'V', 'S', 'D'), nullable=False) start_time = db.Column(db.Time, nullable=False) end_time = db.Column(db.Time, nullable=False) ######################################################################## # Menu and navigation management ######################################################################## def get_menu(page): """ The main menu is a list of lists in the followin format: [unicode caption, {unicode URL endpoint: [unicode route, ...]}, int 0] - The URL end point is the URL where to point to (href) - One of the routes MUST match the route called by request - The int 0 is used to determine which menu entry is actally called. The value MUST be 0.""" menu = [[u'Accueil', {u'/': [u'/']}, 0], [u'Mon compte', {u'/account': [u'/account', u'/account/update']}, 0], [u'Mes tours de staff', {u'/turn': [u'/turn']}, 0], [u'Feuilles de staff', {u'/staff_sheets': [u'/staff_sheet']}, 0], [u'Déconnexion', {u'/logout': [u'/logout']}, 0], ] if session['is_admin']: menu = [[u'Accueil', {u'/': [u'/']}, 0], [u'Tours de staff', {u'/staff': [u'/staff']}, 0], [u'Feuilles de staff', {u'/staff_sheets': [u'/staff_sheet']}, 0], [u'Liste des staffers', {u'/users': [u'/users', u'/account/']}, 0], [u'Déconnexion', {u'/logout': [u'/logout']}, 0], ] for item in menu: for url in item[1]: for route in item[1][url]: if route == page: item[2] = 1 return menu # This should never happen return menu def get_navbar(page, selected): """ The horizontal navbar is a list of lists in the followin format: [unicode caption, {unicode URL endpoint: [unicode route, ...]}, int 0] - The URL end point is the URL where to point to (href) - One of the routes MUST match the route called by request - The int 0 is used to de """ navbars = [[u'First article', {u'/articles/1': [u'/articles', u'/articles/']}, 0, 0], [u'Second article', {u'/articles/2': [u'/articles', u'/articles/']}, 0, 0], [u'Third article', {u'/articles/3': [u'/articles', u'/articles/']}, 0, 0] ] navbar = [] for item in navbars: for url in item[1]: if url == selected: item[2] = 1 for route in item[1][url]: if route == page: navbar.append(item) navbar[len(navbar) - 1][3] = 1 return navbar ######################################################################## # Session management ######################################################################## def sync_session(request, session): """ Synchronize cookies with session """ for key in request.cookies: session[key] = request.cookies[key].encode('utf8') def sync_cookies(response, session): """ Synchronize session with cookies """ for key in session: response.set_cookie(key, value=str(session[key])) def check_session(func): """ Check if the session has required token cookie set. If not, redirects to the login page. """ @wraps(func) def check(*args, **kwargs): try: if session['token'] == request.cookies['token'] and len(session['token']) > 0: # User is logged in and identified return func(*args, **kwargs) else: # User is not logged in or session expired session['token'] = '' response = app.make_response(render_template('login_or_register.html', message='')) sync_cookies(response, session) return response except KeyError: # User is not logged in return render_template('login_or_register.html', message='') return check def check_login(login, password): """ Puts the login verification code here """ hashed_password = bcrypt.hashpw(password, bcrypt.gensalt()) stored_hash = Tetawebapp_users.query.filter_by(mail=login).with_entities(Tetawebapp_users.password).first() is_admin = Tetawebapp_users.query.filter_by(mail=login).with_entities(Tetawebapp_users.is_admin).first() if stored_hash: if bcrypt.checkpw(password, stored_hash[0].encode('utf-8')): session['is_admin'] = is_admin[0] return True return False def register_user(login, password, confirm): """ Register new user """ if password != confirm: # Password does not match confirmation print "[+] Password mismatch confirmation" return False check_user = Tetawebapp_users.query.filter_by(mail=login).count() if check_user != 0: # User already exists print "[+] User already exists" return False hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) user = Tetawebapp_users(mail=login.encode('utf8'), password=hashed_password) try: db.session.add(user) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at register_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def update_user(login, password, confirm, name, phone, diet): """ Update user infos with provided data """ if password != confirm: # Password does not match confirmation print "[+] Password mismatch confirmation" return False check_user = Tetawebapp_users.query.filter_by(mail=login).count() if check_user == 0: # User does not exist print "[+] User does not exist" return False user = Tetawebapp_users.query.filter_by(mail=login).first() if len(password) > 0: # User requested password modification hashed_password = bcrypt.hashpw(password, bcrypt.gensalt()) setattr(user, 'password', hashed_password) # Password has been updated if necessary # Now let's update other data setattr(user, 'name', name) setattr(user, 'phone', phone) setattr(user, 'diet', diet) try: db.session.add(user) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at update_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def update_user_by_id(user_id, login, password, confirm, name, phone, diet): """ Update user infos with provided data """ if password != confirm: # Password does not match confirmation print "[+] Password mismatch confirmation" return False check_user = Tetawebapp_users.query.filter_by(id=user_id).count() if check_user == 0: # User does not exist print "[+] User does not exist" return False user = Tetawebapp_users.query.filter_by(id=user_id).first() if len(password) > 0: # User requested password modification hashed_password = bcrypt.hashpw(password, bcrypt.gensalt()) setattr(user, 'password', hashed_password) # Password has been updated if necessary # Now let's update other data setattr(user, 'name', name) setattr(user, 'phone', phone) setattr(user, 'diet', diet) try: db.session.add(user) commit = db.session.commit() except Exception as e: db.session.rollback() print "[+] Error at update_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False if commit != None: return False return True def delete_user(user_id): """ Delete user """ try: Tetawebapp_users.query.filter_by(id=int(user_id)).delete() db.session.commit() return True except ValueError as e: return False except Exception as e: db.session.rollback() print "[+] Error at delete_user:" print "------------------------------" print "%s" % e.message print "------------------------------" return False def check_user_info(): """ Check user info and send appropriate message if info are not complete""" message = '' user = Tetawebapp_users.query.filter_by(mail=session['login']).first() name = user.name phone = user.phone diet = user.diet if name == None or phone == None or diet == None or \ len(name) == 0 or len(phone) == 0 or len(diet) == 0: message = "Vos informations personnelles ne sont pas complètement renseignées. N'oubliez pas de remplir votre fiche située dans la section 'Mon compte'" return message.decode('utf-8') def gen_token(): """ Generate a random token to be stored in session and cookie """ token = binascii.hexlify(os.urandom(42)) return token ######################################################################## # Routes: # ------- # Except for the index function, the function name MUST have the same # name than the URL endpoint to make the menu work properly ######################################################################## @app.errorhandler(404) def page_not_found(e): """ 404 not found """ return render_template('error.html'), 404 @app.route("/login", methods=['GET', 'POST']) def login(): try: login = request.form.get('login').encode('utf-8') password = request.form.get('password').encode('utf-8') if check_login(login, password): # Generate and store a token in session session['token'] = gen_token() session['login'] = login # Return user to index page page = '/' menu = get_menu(page) message = check_user_info() response = app.make_response(render_template('index.html', menu=menu, message=message, login=login)) # Push token to cookie sync_cookies(response, session) return response # Credentials are not valid response = app.make_response(render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide")) session['token'] = '' sync_cookies(response, session) return response except AttributeError: return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/register", methods=['GET', 'POST']) def register(): try: login = request.form.get('login').encode('utf-8') password = request.form.get('password').encode('utf-8') confirm = request.form.get('confirm').encode('utf-8') if register_user(login, password, confirm): # Generate and store a token in session session['token'] = gen_token() session['login'] = login # Return user to index page page = '/' menu = get_menu(page) message = check_user_info() response = app.make_response(render_template('index.html', menu=menu, login=login, message=message)) # Push token to cookie sync_cookies(response, session) return response # Error while registering user message = "Erreur lors de l'enregsitrement: L'utilisateur existe t-il déjà ?".decode('utf-8') response = app.make_response(render_template('login_or_register.html', message=message)) session['token'] = '' sync_cookies(response, session) return response except AttributeError: return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/", methods=['GET', 'POST']) @check_session def index(): """ Index page """ page = str(request.url_rule) menu = get_menu(page) message = check_user_info() return render_template('index.html', menu=menu, message=message, login=session['login']) @app.route("/account", methods=['GET', 'POST']) @check_session def account(): """ Account page """ page = str(request.url_rule) menu = get_menu(page) user = Tetawebapp_users.query.filter_by(mail=session['login']).first() mail = '' if user.mail == None else user.mail name = '' if user.name == None else user.name phone = '' if user.phone == None else user.phone diet = '' if user.diet == None else user.diet message = check_user_info() return render_template('account.html', menu=menu, mail=mail, name=name, phone=phone, diet=diet, message=message) @app.route("/account/update", methods=['GET', 'POST']) @check_session def update_account(): """ Update current account """ try: page = str(request.url_rule) menu = get_menu(page) login = session['login'] password = request.form.get('password').encode('utf-8') confirm = request.form.get('confirm').encode('utf-8') name = request.form.get('name').encode('utf-8') phone = request.form.get('phone').encode('utf-8') diet = request.form.get('diet').encode('utf-8') if update_user(login, password, confirm, name, phone, diet): message = check_user_info() else: message = "Erreur lors de l'enregistrement des données." return render_template('account.html', menu=menu, mail=login.decode('utf-8'), name=name.decode('utf-8'), phone=phone.decode('utf-8'), diet=diet.decode('utf-8'), message=message) except AttributeError: return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/users", methods=['GET', 'POST']) @check_session def list_users(): """ Users list """ page = str(request.url_rule) menu = get_menu(page) message = check_user_info() staffers = Tetawebapp_users.query.filter_by(is_admin=0).order_by(Tetawebapp_users.name).all() return render_template('list_users.html', menu=menu, staffers=staffers, message=message) @app.route("/account/", methods=['GET', 'POST']) @check_session def account_by_id(ID): """ Arcticles page """ try: if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) user = Tetawebapp_users.query.filter_by(id=ID).first() return render_template('account_by_id.html', menu=menu, user=user) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/account/update/", methods=['GET', 'POST']) @check_session def update_account_by_id(ID): """ Update given account """ try: if session['is_admin']: page = str(request.url_rule) menu = get_menu(page) login = session['login'] password = request.form.get('password').encode('utf-8') confirm = request.form.get('confirm').encode('utf-8') name = request.form.get('name').encode('utf-8') phone = request.form.get('phone').encode('utf-8') diet = request.form.get('diet').encode('utf-8') if update_user_by_id(ID, login, password, confirm, name, phone, diet): user = Tetawebapp_users.query.filter_by(id=ID).first() message = check_user_info() else: message = "Erreur lors de l'enregistrement des données." return render_template('account_by_id.html', menu=menu, user=user, message=message) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/account/delete/", methods=['GET', 'POST']) @check_session def delete_account(ID): """ Delete given account """ try: if session['is_admin']: message = "Erreur lors de la suppression.".decode('utf-8') if delete_user(ID): message = '' page = str(request.url_rule) menu = get_menu(page) staffers = Tetawebapp_users.query.filter_by(is_admin=0).order_by(Tetawebapp_users.name).all() return render_template('list_users.html', menu=menu, staffers=staffers, message=message) # User is not admin return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") except AttributeError: # User is not logged in return render_template('login_or_register.html', message="Utilisateur ou mot de passe invalide") @app.route("/logout", methods=['GET', 'POST']) @check_session def logout(): """ Logout user """ # Remove session token session['token'] = None session['login'] = None # Return user to index page response = app.make_response(render_template('login_or_register.html', message='')) # Push token to cookie sync_cookies(response, session) return response @app.route("/basics", methods=['GET', 'POST']) @check_session def basics(): """ Basics page """ page = str(request.url_rule) menu = get_menu(page) return render_template('basics.html', menu=menu) @app.route("/inputs", methods=['GET', 'POST']) @check_session def inputs(): """ Show the input collection """ page = str(request.url_rule) menu = get_menu(page) return render_template('inputs.html', menu=menu) @app.route("/ajax", methods=['GET', 'POST']) @check_session def ajax(): """ Propose various AJAX tests """ page = str(request.url_rule) menu = get_menu(page) return render_template('ajax.html', menu=menu) @app.route("/database", methods=['GET', 'POST']) @check_session def database(): """ A blah on using databases """ page = str(request.url_rule) menu = get_menu(page) return render_template('database.html', menu=menu) @app.route("/todo", methods=['GET', 'POST']) @check_session def todo(): """ The famous TODO list """ page = str(request.url_rule) menu = get_menu(page) return render_template('todo.html', menu=menu) ######################################################################## # AJAX routes ######################################################################## @app.route("/get_html_from_ajax", methods=['GET', 'POST']) @check_session def get_html_from_ajax(): """ Return HTML code to an AJAX request It may generate a 404 http error for testing purpose """ if int(random.random()*10) % 2: # Randomly generate 404 HTTP response return render_template('error.html'), 404 return render_template('ajax_html.html') @app.route("/get_value_from_ajax", methods=['GET', 'POST']) @check_session def get_value_from_ajax(): """ Return a randomly generated value to an AJAX request It may return an error code for testing purpose """ err_code = 'TETA_ERR' RND = int(random.random()*10) if RND % 2: # Randomly generate error return err_code return str(RND) @app.route("/set_value_from_ajax/", methods=['GET', 'POST']) @check_session def set_value_from_ajax(value): """ Accept a value from an AJAX request It may return an error code for testing purpose """ err_code = 'TETA_ERR' if value != 'We Make Porn': return 'True' return err_code @app.route("/upload", methods=['POST']) @check_session def upload(): """ Save a file from AJAX request Files are saved in UPLOADED_FILES_DEST (see config.local.py) """ err_code = 'TETA_ERR' RND = int(random.random()*10) if RND % 2: # Randomly generate error print err_code return err_code uploaded_files = [] if len(request.files) > 0 and request.files['files']: uploaded_files = request.files.getlist("files") print "Uploaded files:" for f in uploaded_files: print ' [+] %s [%s]' % (f.filename, f.content_type) # Before saving you should: # - Secure the filename # - Check file size # - Check content type f.save(os.path.join(app.config['UPLOADED_FILES_DEST'], f.filename)) f.close() return "OK" ######################################################################## # Main ######################################################################## if __name__ == '__main__': app.run(host='0.0.0.0')