#!/usr/bin/env python3 import os import wave import math import shout import struct import random import argparse import subprocess def sine_wave(frequency=444, framerate=44100, amplitude=0.5, duration=1): """ frequency: La fréquence à échantillonner framerate: Le taux d'échantillonnage amplitude: Le volume sonore (de 0 à 1) duration: La durée de l'échantillon en seconde """ amplitude = amplitude * 32767 if amplitude > 32767: amplitude = 32767 elif amplitude < 0: amplitude = 0 data = [] for i in range(int(duration * framerate)): value = int(amplitude*math.cos(2*frequency*math.pi*float(i)/float(framerate))) data.append(value) return struct.pack('<{}h'.format(len(data)), *data) def notes_table(oord): freq = math.fabs(math.floor(math.log(oord)*int(args.adjust[0])-int(args.adjust[1]))) duration = args.duration return (freq, duration) def convert2ogg(wav_file, ogg_file): result = subprocess.call(['oggenc', wav_file, '-o', '{}'.format(ogg_file)]) return result def push_ogg(ogg_file): icecast = shout.Shout() icecast.host = args.host icecast.port = args.port icecast.user = args.user icecast.password = args.password icecast.mount = args.mount icecast.protocol = args.protocol icecast.format = 'ogg' print("Connexion au serveur {}://{}:{} {}".format(args.protocol, args.host, args.port, args.mount)) print("Taille du tampon: {} octets".format(args.ice_buffer)) icecast.open() with open(ogg_file, 'rb') as ogg: total = 0 if args.debug: print("- Lecture de {} octets".format(args.ice_buffer)) new_buff = ogg.read(args.ice_buffer) while True: cur_buff = new_buff if args.debug: print("- Lecture des {} octets suivants".format(args.ice_buffer)) new_buff = ogg.read(args.ice_buffer) total += len(cur_buff) if len(cur_buff) == 0: print("- Tampon vide => Fin du fichier") print("- Nombre d'octets envoyés: {}".format(total)) break if args.debug: print("- Envoi de {} octets".format(args.ice_buffer)) icecast.send(cur_buff) icecast.sync() def main(): wav_output = "{}.wav".format(os.path.basename(args.log).split('.')[0]) wav_output = os.path.join(args.workdir, wav_output) ogg_output = "{}.ogg".format(os.path.basename(args.log).split('.')[0]) ogg_output = os.path.join(args.workdir, ogg_output) with wave.open(wav_output, 'wb') as wave_file: nchannels = 1 sampwidth = 2 framerate = args.framerate wave_file.setnchannels(nchannels) wave_file.setsampwidth(sampwidth) wave_file.setframerate(framerate) try: with open(args.log, 'r') as log: for letter in log.read(): oord = ord(letter) freq, duration = notes_table(oord) if oord not in args.exclude: if args.debug: print("Caractère: {} | Ord: {} | Fréquence: {} | Durée: {}".format(letter, oord, freq, duration)) wave_file.writeframes(sine_wave(frequency=freq, duration=duration)) result = convert2ogg(wav_output, ogg_output) if result == 0: push_ogg(ogg_output) else: raise Exception("Erreur lors de la conversion en OGG: {}".format(result)) except Exception as e: print('\033[91m{}\033[0m'.format(e)) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--log', help='Le fichier de log à traiter (requis)', type=str, required=True) parser.add_argument('--host', help="Server Icecast2 (requis)", type=str, required=True) parser.add_argument('--port', help="Port TCP sur lequel contacter le serveur Icecast2 (requis)", type=int, required=True) parser.add_argument('--user', help="Nom d'utilisateur (requis)", type=str, required=True) parser.add_argument('--password', help="Mot de passe (requis)", type=str, required=True) parser.add_argument('--mount', help="Point de montage Icecast2 (requis)", type=str, required=True) parser.add_argument('--workdir', help='Le répertoire de travail (défaut: /tmp)', type=str, default='/tmp') parser.add_argument('--duration', help="La durée d'une note (défaut: 0.008)", type=float, default=0.08) parser.add_argument('--amplitude', help="Le niveau sonore (défaut: 0.5)", type=float, default=0.5) parser.add_argument('--framerate', help="Le taux d'échantillonage (défaut: 44100)", type=int, default=44100) parser.add_argument('--adjust', help="Facteurs d'ajustement (défaut: 5000 20000)", nargs=2, default=[5000, 20000]) parser.add_argument('--exclude', help="Liste des caractères non traités (défaut: [])", nargs="*", default=[]) parser.add_argument('--protocol', help="Protocole à utiliser (défaut: http)", type=str, default='http') parser.add_argument('--ice_buffer', help="Taille du tampon Icecast (défaut: 32768)", type=int, default=32768) parser.add_argument('--debug', help="Affiche l'activité (défaut: False)", type=bool, default=False) args = parser.parse_args() main()