Автор оригинала: Doug Hellmann.
Часто необходимо работать с другими программами и процессами, чтобы воспользоваться существующим кодом, не переписывая его или для доступа к библиотекам или функциям, не доступными в Python. Как и в случае с сетью ввода/вывода, <код> ASYNCIO включает в себя две абстракции для запуска другой программы, а затем взаимодействуя с ним.
Использование абстракции протокола с подпроцессами
В этом примере используется COROTINE для запуска процесса для запуска команды Unix <код> DF , чтобы найти свободное место на локальных дисках. Он использует SUPPOCESS_EXEC ()
Чтобы запустить процесс и завязать его к классу протокола, который знает, как прочитать <код> DF Вывод команды и анализа его. Методы класса протокола называются автоматически на основе событий ввода/вывода для подпроцесса. Поскольку оба <код> stdin и stderr
аргументы устанавливаются на <код> none , эти каналы связи не подключены к новому процессу.
asyncio_subprocess_protocol.py
import asyncio import functools async def run_df(loop): print('in run_df') cmd_done asyncio.Future(looploop) factory functools.partial(DFProtocol, cmd_done) proc loop.subprocess_exec( factory, 'df', '-hl', stdinNone, stderrNone, ) try: print('launching process') transport, protocol await proc print('waiting for process to complete') await cmd_done finally: transport.close() return cmd_done.result()
Класс <Код> DFProtocol получен из <Код> SubprocessProtocol , который определяет API для класса для связи с другим процессом через трубы. Ожидается, что СДЕЛАНО
BURGEL> BUSTORT , который вызывает, чтобы посмотреть процесс для завершения процесса.
class DFProtocol(asyncio.SubprocessProtocol): FD_NAMES ['stdin', 'stdout', 'stderr'] def __init__(self, done_future): self.done done_future self.buffer bytearray() super().__init__()
Как и в связи с сокетом, <код> Connection_Made () вызывается при настройке входных каналов к новому процессу. Аргумент Transport
является экземпляром подкласса BASESUBPROCESSPRANSPORT
. Он может прочитать вывод данных по процессу и записи данных в входной поток для процесса, если процесс был сконфигурирован для приема ввода.
def connection_made(self, transport): print('process started {}'.format(transport.get_pid())) self.transport transport
Когда процесс сгенерировал выход, <код> PITOR_DATA_RECEED () вызывается с дескриптором файла, в котором были излучены данные, и фактические данные читаются с трубы. Класс протокола сохраняет выход из стандартного выходного канала процесса в буфере для последующей обработки.
def pipe_data_received(self, fd, data): print('read {} bytes from {}'.format(len(data), self.FD_NAMES[fd])) if fd 1: self.buffer.extend(data)
Когда процесс завершается, <код> process_Exted () вызывается. Выходной код процесса доступен от транспортного объекта, вызывая <код> get_returnCode () . В этом случае, если нет ошибок, не указано, доступный вывод декодируется и анализируется перед возвращением через <код> будущего . Если есть ошибка, результаты предполагаются пустыми. Установка результата будущего говорит <код> run_df () , что процесс выступил, поэтому он очищает, а затем возвращает результаты.
def process_exited(self): print('process exited') return_code self.transport.get_returncode() print('return code {}'.format(return_code)) if not return_code: cmd_output bytes(self.buffer).decode() results self._parse_results(cmd_output) else: results [] self.done.set_result((return_code, results))
Вывод команд анализируется в последовательности словарей, отображающих имен заголовка для их значений для каждой строки вывода, а полученный список возвращается.
def _parse_results(self, output): print('parsing results') # Output has one row of headers, all single words. The # remaining rows are one per filesystem, with columns # matching the headers (assuming that none of the # mount points have whitespace in the names). if not output: return [] lines output.splitlines() headers lines[0].split() devices lines[1:] results [ dict(zip(headers, line.split())) for line in devices ] return results
RUN_DF ()
COROUTINE работает с использованием RUN_UNTIL_COMPLETE ()
, то результаты рассмотрены, и напечатано свободное пространство на каждом устройстве.
event_loop asyncio.get_event_loop() try: return_code, results event_loop.run_until_complete( run_df(event_loop) ) finally: event_loop.close() if return_code: print('error exit {}'.format(return_code)) else: print('\nFree space:') for r in results: print('{Mounted:25}: {Avail}'.format(**r))
Вывод ниже показывает последовательность предпринимаемых шагов, а свободное пространство на трех дисках на системе, где она была запущена.
$ python3 asyncio_subprocess_protocol.py in run_df launching process process started 49675 waiting for process to complete read 332 bytes from stdout process exited return code 0 parsing results Free space: / : 233Gi /Volumes/hubertinternal : 157Gi /Volumes/hubert-tm : 2.3Ti
Вызов подпроцессов с Coroutines и Streams
Чтобы использовать COROUTINES для выполнения процесса напрямую, вместо того, чтобы доступа к него через <код> Протокол Subclass, вызов Create_SubProcess_Exec ()
и указать, какой из <код> stdout код> STDERR и <код> stdin для подключения к трубе. Результат COROUTINE для порождения подпроцесса является <код> процесса , который можно использовать для манипулирования подпроцессором или взаимодействовать с ним.
asyncio_subprocess_coroutine.py
import asyncio import asyncio.subprocess async def run_df(): print('in run_df') buffer bytearray() create asyncio.create_subprocess_exec( 'df', '-hl', stdoutasyncio.subprocess.PIPE, ) print('launching process') proc await create print('process started {}'.format(proc.pid))
В этом примере <код> DF не нуждается в вводе, кроме его аргументов командной строки, поэтому следующий шаг предназначен для чтения всех выходов. С помощью протокола <код> нет контроля над тем, сколько данных прочитано за раз. В этом примере используется <код> readline () , но он также может звонить READ ()
непосредственно для чтения данных, которые не ориентированы на линейке. Выходной сигнал команды буферизается, как при примере протокола, поэтому его можно проанализировать позже.
while True: line await proc.stdout.readline() print('read {!r}'.format(line)) if not line: print('no more output from command') break buffer.extend(line)
Метод readline () Readline () возвращает пустую строку байтов, когда больше нет вывода, потому что программа закончилась. Для обеспечения правильной очистки процесса следующий шаг – ждать, пока процесс полностью выходит.
print('waiting for process to complete') await proc.wait()
В этот момент может быть осмотрен статус выхода, чтобы определить, разбирают ли вывод или обрабатывать ошибку, поскольку она не производила вывода. Логика анализа такая же, как и в предыдущем примере, но находится в отдельной функции (не отображается здесь), поскольку нет класса протокола, чтобы скрыть его. После того, как данные проанализированы, результаты и выходной код затем возвращаются к абонеру.
return_code proc.returncode print('return code {}'.format(return_code)) if not return_code: cmd_output bytes(buffer).decode() results _parse_results(cmd_output) else: results [] return (return_code, results)
Основная программа выглядит аналогичной приведению на основе протокола, поскольку изменения реализации изолированы в RUN_DF ()
.
event_loop asyncio.get_event_loop() try: return_code, results event_loop.run_until_complete( run_df() ) finally: event_loop.close() if return_code: print('error exit {}'.format(return_code)) else: print('\nFree space:') for r in results: print('{Mounted:25}: {Avail}'.format(**r))
Поскольку вывод из <Код> DF можно прочитать одну строку за раз, он повторяется, чтобы показать прогресс программы. В противном случае вывод выглядит похоже на предыдущий пример.
$ python3 asyncio_subprocess_coroutine.py in run_df launching process process started 49678 read b'Filesystem Size Used Avail Capacity iused ifree %iused Mounted on\n' read b'/dev/disk2s2 446Gi 213Gi 233Gi 48% 55955082 61015132 48% /\n' read b'/dev/disk1 465Gi 307Gi 157Gi 67% 80514922 41281172 66% /Volumes/hubertinternal\n' read b'/dev/disk3s2 3.6Ti 1.4Ti 2.3Ti 38% 181837749 306480579 37% /Volumes/hubert-tm\n' read b'' no more output from command waiting for process to complete return code 0 parsing results Free space: / : 233Gi /Volumes/hubertinternal : 157Gi /Volumes/hubert-tm : 2.3Ti
Отправка данных на подпрокат
Оба предыдущих примера использовали только один канал связи для чтения данных из второго процесса. Часто нужно отправлять данные в команду для обработки. Этот пример определяет COROUTINE, чтобы выполнить команду UNIX <код> TR для перевода символов в его входном потоке. В этом случае TR используется для преобразования строчных букв в заглавные буквы.
<Код> to_eupper () Coroutine принимает аргумент в качестве аргумента величина и входной строки. Это порождает второй процесс, работающий <код> «TR [: Нижние:] [: Верхний:]« .
asyncio_subprocess_coroutine_write.py
import asyncio import asyncio.subprocess async def to_upper(input): print('in to_upper') create asyncio.create_subprocess_exec( 'tr', '[:lower:]', '[:upper:]', stdoutasyncio.subprocess.PIPE, stdinasyncio.subprocess.PIPE, ) print('launching process') proc await create print('pid {}'.format(proc.pid))
Следующий <код> to_upper () использует обмениваться () метод <код> , чтобы отправить входную строку в команду и прочитать все полученные результаты, асинхронно. Как и в случае с Subprocess.POPEN версия того же метода, Comment ()
возвращает полные строки выходных байтов. Если команда, скорее всего, будет создавать больше данных, чем может удобно вписаться в память, вход не может быть произведен все сразу, или вывод должен быть обработан постепенно, можно использовать ручки stdin, stdout и STDERR > Процесс напрямую вместо звонка <код> обменивается () .
print('communicating with process') stdout, stderr await proc.communicate(input.encode())
После того, как ввод/вывод, ожидание того, что процесс полностью выхода гарантирует, что он правильно очищен.
print('waiting for process to complete') await proc.wait()
Затем необходимо изучить код возврата, а выводная байтовая строка декодировала, для приготовления возвращаемого значения от COROUTINE.
return_code proc.returncode print('return code {}'.format(return_code)) if not return_code: results bytes(stdout).decode() else: results '' return (return_code, results)
Основная часть программы устанавливает строку сообщений, которая будет преобразована, а затем устанавливает цикл события для запуска <код> to_upper () и печатает результаты.
MESSAGE """ This message will be converted to all caps. """ event_loop asyncio.get_event_loop() try: return_code, results event_loop.run_until_complete( to_upper(MESSAGE) ) finally: event_loop.close() if return_code: print('error exit {}'.format(return_code)) else: print('Original: {!r}'.format(MESSAGE)) print('Changed : {!r}'.format(results))
Вывод показывает последовательность операций, а затем преобразуется простое текстовое сообщение.
$ python3 asyncio_subprocess_coroutine_write.py in to_upper launching process pid 49684 communicating with process waiting for process to complete return code 0 Original: '\nThis message will be converted\nto all caps.\n' Changed : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'