114 lines
4.4 KiB
Python
Executable File
114 lines
4.4 KiB
Python
Executable File
#!/bin/env python3
|
|
|
|
import os
|
|
import wave
|
|
import math
|
|
import shout
|
|
import struct
|
|
import random
|
|
import argparse
|
|
import subprocess
|
|
|
|
def sine_wave(frequency=444, framerate=44100, amplitude=0.5, duration=1):
|
|
"""
|
|
frequency: La fréquence à échantillonner
|
|
framerate: Le taux d'échantillonnage
|
|
amplitude: Le volume sonore (de 0 à 1)
|
|
duration: La durée de l'échantillon en seconde
|
|
"""
|
|
amplitude = amplitude * 32767
|
|
if amplitude > 32767:
|
|
amplitude = 32767
|
|
elif amplitude < 0:
|
|
amplitude = 0
|
|
|
|
data = []
|
|
for i in range(int(duration * framerate)):
|
|
value = int(amplitude*math.cos(2*frequency*math.pi*float(i)/float(framerate)))
|
|
data.append(value)
|
|
return struct.pack('<{}h'.format(len(data)), *data)
|
|
|
|
def notes_table(oord):
|
|
freq = math.fabs(math.floor(math.log(oord)*int(args.adjust[0])-int(args.adjust[1])))
|
|
duration = args.duration
|
|
return (freq, duration)
|
|
|
|
def convert2ogg(wav_file, ogg_file):
|
|
result = subprocess.call(['oggenc', wav_file, '-o {} --utf8 -t {}'.format(ogg_file, args.log)])
|
|
return result
|
|
|
|
def push_ogg(ogg_file):
|
|
icecast = shout.Shout()
|
|
icecast.host = args.host
|
|
icecast.port = args.port
|
|
icecast.user = args.user
|
|
icecast.password = args.password
|
|
icecast.mount = args.mount
|
|
icecast.protocol = args.protocol
|
|
icecast.format = 'ogg'
|
|
print("Connecting to {}://{}:{}/{}".format(args.protocol, args.host, args.port, args.mount))
|
|
icecast.open()
|
|
with open(ogg_file, 'rb') as ogg:
|
|
total = 0
|
|
print("- Reading 4096 bytes")
|
|
new_buff = ogg.read(4096)
|
|
while True:
|
|
cur_buff = new_buff
|
|
print("- Reading next 4096 bytes")
|
|
new_buff = ogg.read(4096)
|
|
total += len(cur_buff)
|
|
if len(cur_buff) == 0:
|
|
print(" - Buffer is empty: EOF")
|
|
print(" - Sent: {} bytes".format(total))
|
|
break
|
|
print("- Sending 4096 bytes")
|
|
icecast.send(cur_buff)
|
|
icecast.sync()
|
|
|
|
def main():
|
|
wav_output = "{}.wav".format(os.path.basename(args.log).split('.')[0])
|
|
wav_output = os.path.join(args.workdir, wav_output)
|
|
ogg_output = "{}.ogg".format(os.path.basename(args.log).split('.')[0])
|
|
ogg_output = os.path.join(args.workdir, ogg_output)
|
|
|
|
with wave.open(wav_output, 'wb') as wave_file:
|
|
nchannels = 1
|
|
sampwidth = 2
|
|
framerate = args.framerate
|
|
wave_file.setnchannels(nchannels)
|
|
wave_file.setsampwidth(sampwidth)
|
|
wave_file.setframerate(framerate)
|
|
try:
|
|
with open(args.log, 'r') as log:
|
|
for letter in log.read():
|
|
oord = ord(letter)
|
|
freq, duration = notes_table(oord)
|
|
if oord not in args.exclude:
|
|
print("Caractère: {} | Ord: {} | Fréquence: {} | Durée: {}".format(letter, oord, freq, duration))
|
|
wave_file.writeframes(sine_wave(frequency=freq, duration=duration))
|
|
result = convert2ogg(wav_output, ogg_output)
|
|
if result == 0:
|
|
push_ogg(ogg_output)
|
|
else:
|
|
raise Exception("Erreur lors de la conversion en OGG: {}".format(result))
|
|
except Exception as e:
|
|
print('\033[91m{}\033[0m'.format(e))
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--log', help='Le fichier de log à traiter (requis)', type=str, required=True)
|
|
parser.add_argument('--workdir', help='Le répertoire de travail (défaut: /tmp)', type=str, default='/tmp')
|
|
parser.add_argument('--duration', help="La durée d'une note (défaut: 0.008)", type=float, default=0.08)
|
|
parser.add_argument('--amplitude', help="Le niveau sonore (défaut: 0.5)", type=float, default=0.5)
|
|
parser.add_argument('--framerate', help="Le taux d'échantillonage (défaut: 44100)", type=int, default=44100)
|
|
parser.add_argument('--adjust', help="Facteurs d'ajustement (défaut: 5000 20000)", nargs=2, default=[5000, 20000])
|
|
parser.add_argument('--exclude', help="Liste des caractères non traités (défaut: [])", nargs="*", default=[])
|
|
parser.add_argument('--host', help="Server Icecast2 (requis)", type=str, required=True)
|
|
parser.add_argument('--port', help="Port TCP sur lequel contacter le serveur Icecast2 (requis)", type=int, required=True)
|
|
parser.add_argument('--user', help="Nom d'utilisateur (requis)", type=str, required=True)
|
|
parser.add_argument('--password', help="Mot de passe (requis)", type=str, required=True)
|
|
parser.add_argument('--mount', help="Point de montage Icecast2 (requis)", type=str, required=True)
|
|
parser.add_argument('--protocol', help="Protocol à utiliser (défaut: http)", type=str, default='http')
|
|
args = parser.parse_args()
|
|
main()
|