吃鸡游戏
正如这些看似简单的问题似乎总是如此,魔鬼在细节中。我最终写了一些代码来解决这个问题。pip 依赖项可以使用python3 -m pip install ffmpeg-python PyOpenAL. 代码的工作流程可以分为两步:代码必须从在线流中下载 mp3 文件数据的二进制块,并将它们转换为原始 PCM 数据(基本上是签名的 uint16_t 幅度值)以进行播放。这是使用ffmpeg-python 库完成的,它是FFmpeg的包装器。此包装器在单独的进程中运行 FFmpeg,因此此处不会发生阻塞。然后代码必须将这些块排队以进行播放。这是使用PyOpenAL完成的,它是OpenAL的包装器。在创建设备和上下文以启用音频播放后,将创建一个 3d 定位的源。这个源不断地与缓冲区(模拟“环形缓冲区”)一起排队,这些缓冲区填充了从 FFmpeg 输入的数据。这与第一步在单独的线程上运行,使下载新音频块独立于音频块播放运行。这是该代码的样子(带有一些注释)。如果您对代码或此答案的任何其他部分有任何疑问,请告诉我。import ctypesimport ffmpegimport numpy as npfrom openal.al import *from openal.alc import *from queue import Queue, Emptyfrom threading import Threadimport timefrom urllib.request import urlopendef init_audio(): #Create an OpenAL device and context. device_name = alcGetString(None, ALC_DEFAULT_DEVICE_SPECIFIER) device = alcOpenDevice(device_name) context = alcCreateContext(device, None) alcMakeContextCurrent(context) return (device, context)def create_audio_source(): #Create an OpenAL source. source = ctypes.c_uint() alGenSources(1, ctypes.pointer(source)) return sourcedef create_audio_buffers(num_buffers): #Create a ctypes array of OpenAL buffers. buffers = (ctypes.c_uint * num_buffers)() buffers_ptr = ctypes.cast( ctypes.pointer(buffers), ctypes.POINTER(ctypes.c_uint), ) alGenBuffers(num_buffers, buffers_ptr) return buffers_ptrdef fill_audio_buffer(buffer_id, chunk): #Fill an OpenAL buffer with a chunk of PCM data. alBufferData(buffer_id, AL_FORMAT_STEREO16, chunk, len(chunk), 44100)def get_audio_chunk(process, chunk_size): #Fetch a chunk of PCM data from the FFMPEG process. return process.stdout.read(chunk_size)def play_audio(process): #Queues up PCM chunks for playing through OpenAL num_buffers = 4 chunk_size = 8192 device, context = init_audio() source = create_audio_source() buffers = create_audio_buffers(num_buffers) #Initialize the OpenAL buffers with some chunks for i in range(num_buffers): buffer_id = ctypes.c_uint(buffers[i]) chunk = get_audio_chunk(process, chunk_size) fill_audio_buffer(buffer_id, chunk) #Queue the OpenAL buffers into the OpenAL source and start playing sound! alSourceQueueBuffers(source, num_buffers, buffers) alSourcePlay(source) num_used_buffers = ctypes.pointer(ctypes.c_int()) while True: #Check if any buffers are used up/processed and refill them with data. alGetSourcei(source, AL_BUFFERS_PROCESSED, num_used_buffers) if num_used_buffers.contents.value != 0: used_buffer_id = ctypes.c_uint() used_buffer_ptr = ctypes.pointer(used_buffer_id) alSourceUnqueueBuffers(source, 1, used_buffer_ptr) chunk = get_audio_chunk(process, chunk_size) fill_audio_buffer(used_buffer_id, chunk) alSourceQueueBuffers(source, 1, used_buffer_ptr)if __name__ == "__main__": url = "http://icecast.spc.org:8000/longplayer" #Run FFMPEG in a separate process using subprocess, so it is non-blocking process = ( ffmpeg .input(url) .output("pipe:", format='s16le', acodec='pcm_s16le', ac=2, ar=44100, loglevel="quiet") .run_async(pipe_stdout=True) ) #Run audio playing OpenAL code in a separate thread thread = Thread(target=play_audio, args=(process,), daemon=True) thread.start() #Some example code to show that this is not being blocked by the audio. start = time.time() while True: print(time.time() - start)
慕姐8265434
使用pyminiaudio : (它提供了一个 icecast 流源类):import miniaudiodef title_printer(client: miniaudio.IceCastClient, new_title: str) -> None: print("Stream title: ", new_title)with miniaudio.IceCastClient("http://icecast.spc.org:8000/longplayer", update_stream_title=title_printer) as source: print("Connected to internet stream, audio format:", source.audio_format.name) print("Station name: ", source.station_name) print("Station genre: ", source.station_genre) print("Press <enter> to quit playing.\n") stream = miniaudio.stream_any(source, source.audio_format) with miniaudio.PlaybackDevice() as device: device.start(stream) input() # wait for user input, stream plays in background