Рубрики
Без рубрики

Работа с подпроцессами

Автор оригинала: Doug Hellmann.

Часто необходимо работать с другими программами и процессами, чтобы воспользоваться существующим кодом, не переписывая его или для доступа к библиотекам или функциям, не доступными в Python. Как и в случае с сетью ввода/вывода, <код> ASYNCIO включает в себя две абстракции для запуска другой программы, а затем взаимодействуя с ним.

Использование абстракции протокола с подпроцессами

В этом примере используется COROTINE для запуска процесса для запуска команды Unix <код> DF , чтобы найти свободное место на локальных дисках. Он использует SUPPOCESS_EXEC () Чтобы запустить процесс и завязать его к классу протокола, который знает, как прочитать <код> DF Вывод команды и анализа его. Методы класса протокола называются автоматически на основе событий ввода/вывода для подпроцесса. Поскольку оба <код> stdin и stderr аргументы устанавливаются на <код> none , эти каналы связи не подключены к новому процессу.

asyncio_subprocess_protocol.py

import asyncio
import functools


async def run_df(loop):
    print('in run_df')

    cmd_done  asyncio.Future(looploop)
    factory  functools.partial(DFProtocol, cmd_done)
    proc  loop.subprocess_exec(
        factory,
        'df', '-hl',
        stdinNone,
        stderrNone,
    )
    try:
        print('launching process')
        transport, protocol  await proc
        print('waiting for process to complete')
        await cmd_done
    finally:
        transport.close()

    return cmd_done.result()

Класс <Код> DFProtocol получен из <Код> SubprocessProtocol , который определяет API для класса для связи с другим процессом через трубы. Ожидается, что СДЕЛАНО BURGEL> BUSTORT , который вызывает, чтобы посмотреть процесс для завершения процесса.

class DFProtocol(asyncio.SubprocessProtocol):

    FD_NAMES  ['stdin', 'stdout', 'stderr']

    def __init__(self, done_future):
        self.done  done_future
        self.buffer  bytearray()
        super().__init__()

Как и в связи с сокетом, <код> Connection_Made () вызывается при настройке входных каналов к новому процессу. Аргумент Transport является экземпляром подкласса BASESUBPROCESSPRANSPORT . Он может прочитать вывод данных по процессу и записи данных в входной поток для процесса, если процесс был сконфигурирован для приема ввода.

def connection_made(self, transport):
        print('process started {}'.format(transport.get_pid()))
        self.transport  transport

Когда процесс сгенерировал выход, <код> PITOR_DATA_RECEED () вызывается с дескриптором файла, в котором были излучены данные, и фактические данные читаются с трубы. Класс протокола сохраняет выход из стандартного выходного канала процесса в буфере для последующей обработки.

def pipe_data_received(self, fd, data):
        print('read {} bytes from {}'.format(len(data),
                                             self.FD_NAMES[fd]))
        if fd  1:
            self.buffer.extend(data)

Когда процесс завершается, <код> process_Exted () вызывается. Выходной код процесса доступен от транспортного объекта, вызывая <код> get_returnCode () . В этом случае, если нет ошибок, не указано, доступный вывод декодируется и анализируется перед возвращением через <код> будущего . Если есть ошибка, результаты предполагаются пустыми. Установка результата будущего говорит <код> run_df () , что процесс выступил, поэтому он очищает, а затем возвращает результаты.

def process_exited(self):
        print('process exited')
        return_code  self.transport.get_returncode()
        print('return code {}'.format(return_code))
        if not return_code:
            cmd_output  bytes(self.buffer).decode()
            results  self._parse_results(cmd_output)
        else:
            results  []
        self.done.set_result((return_code, results))

Вывод команд анализируется в последовательности словарей, отображающих имен заголовка для их значений для каждой строки вывода, а полученный список возвращается.

def _parse_results(self, output):
        print('parsing results')
        # Output has one row of headers, all single words.  The
        # remaining rows are one per filesystem, with columns
        # matching the headers (assuming that none of the
        # mount points have whitespace in the names).
        if not output:
            return []
        lines  output.splitlines()
        headers  lines[0].split()
        devices  lines[1:]
        results  [
            dict(zip(headers, line.split()))
            for line in devices
        ]
        return results

RUN_DF () COROUTINE работает с использованием RUN_UNTIL_COMPLETE () , то результаты рассмотрены, и напечатано свободное пространство на каждом устройстве.

event_loop  asyncio.get_event_loop()
try:
    return_code, results  event_loop.run_until_complete(
        run_df(event_loop)
    )
finally:
    event_loop.close()

if return_code:
    print('error exit {}'.format(return_code))
else:
    print('\nFree space:')
    for r in results:
        print('{Mounted:25}: {Avail}'.format(**r))

Вывод ниже показывает последовательность предпринимаемых шагов, а свободное пространство на трех дисках на системе, где она была запущена.

$ python3 asyncio_subprocess_protocol.py

in run_df
launching process
process started 49675
waiting for process to complete
read 332 bytes from stdout
process exited
return code 0
parsing results

Free space:
/                        : 233Gi
/Volumes/hubertinternal  : 157Gi
/Volumes/hubert-tm       : 2.3Ti

Вызов подпроцессов с Coroutines и Streams

Чтобы использовать COROUTINES для выполнения процесса напрямую, вместо того, чтобы доступа к него через <код> Протокол Subclass, вызов Create_SubProcess_Exec () и указать, какой из <код> stdout код> STDERR и <код> stdin для подключения к трубе. Результат COROUTINE для порождения подпроцесса является <код> процесса , который можно использовать для манипулирования подпроцессором или взаимодействовать с ним.

asyncio_subprocess_coroutine.py

import asyncio
import asyncio.subprocess


async def run_df():
    print('in run_df')

    buffer  bytearray()

    create  asyncio.create_subprocess_exec(
        'df', '-hl',
        stdoutasyncio.subprocess.PIPE,
    )
    print('launching process')
    proc  await create
    print('process started {}'.format(proc.pid))

В этом примере <код> DF не нуждается в вводе, кроме его аргументов командной строки, поэтому следующий шаг предназначен для чтения всех выходов. С помощью протокола <код> нет контроля над тем, сколько данных прочитано за раз. В этом примере используется <код> readline () , но он также может звонить READ () непосредственно для чтения данных, которые не ориентированы на линейке. Выходной сигнал команды буферизается, как при примере протокола, поэтому его можно проанализировать позже.

while True:
        line  await proc.stdout.readline()
        print('read {!r}'.format(line))
        if not line:
            print('no more output from command')
            break
        buffer.extend(line)

Метод readline () Readline () возвращает пустую строку байтов, когда больше нет вывода, потому что программа закончилась. Для обеспечения правильной очистки процесса следующий шаг – ждать, пока процесс полностью выходит.

print('waiting for process to complete')
    await proc.wait()

В этот момент может быть осмотрен статус выхода, чтобы определить, разбирают ли вывод или обрабатывать ошибку, поскольку она не производила вывода. Логика анализа такая же, как и в предыдущем примере, но находится в отдельной функции (не отображается здесь), поскольку нет класса протокола, чтобы скрыть его. После того, как данные проанализированы, результаты и выходной код затем возвращаются к абонеру.

return_code  proc.returncode
    print('return code {}'.format(return_code))
    if not return_code:
        cmd_output  bytes(buffer).decode()
        results  _parse_results(cmd_output)
    else:
        results  []

    return (return_code, results)

Основная программа выглядит аналогичной приведению на основе протокола, поскольку изменения реализации изолированы в RUN_DF () .

event_loop  asyncio.get_event_loop()
try:
    return_code, results  event_loop.run_until_complete(
        run_df()
    )
finally:
    event_loop.close()

if return_code:
    print('error exit {}'.format(return_code))
else:
    print('\nFree space:')
    for r in results:
        print('{Mounted:25}: {Avail}'.format(**r))

Поскольку вывод из <Код> DF можно прочитать одну строку за раз, он повторяется, чтобы показать прогресс программы. В противном случае вывод выглядит похоже на предыдущий пример.

$ python3 asyncio_subprocess_coroutine.py

in run_df
launching process
process started 49678
read b'Filesystem     Size   Used  Avail Capacity   iused
ifree %iused  Mounted on\n'
read b'/dev/disk2s2  446Gi  213Gi  233Gi    48%  55955082
61015132   48%   /\n'
read b'/dev/disk1    465Gi  307Gi  157Gi    67%  80514922
41281172   66%   /Volumes/hubertinternal\n'
read b'/dev/disk3s2  3.6Ti  1.4Ti  2.3Ti    38% 181837749
306480579   37%   /Volumes/hubert-tm\n'
read b''
no more output from command
waiting for process to complete
return code 0
parsing results

Free space:
/                        : 233Gi
/Volumes/hubertinternal  : 157Gi
/Volumes/hubert-tm       : 2.3Ti

Отправка данных на подпрокат

Оба предыдущих примера использовали только один канал связи для чтения данных из второго процесса. Часто нужно отправлять данные в команду для обработки. Этот пример определяет COROUTINE, чтобы выполнить команду UNIX <код> TR для перевода символов в его входном потоке. В этом случае TR используется для преобразования строчных букв в заглавные буквы.

<Код> to_eupper () Coroutine принимает аргумент в качестве аргумента величина и входной строки. Это порождает второй процесс, работающий <код> «TR [: Нижние:] [: Верхний:]« .

asyncio_subprocess_coroutine_write.py

import asyncio
import asyncio.subprocess


async def to_upper(input):
    print('in to_upper')

    create  asyncio.create_subprocess_exec(
        'tr', '[:lower:]', '[:upper:]',
        stdoutasyncio.subprocess.PIPE,
        stdinasyncio.subprocess.PIPE,
    )
    print('launching process')
    proc  await create
    print('pid {}'.format(proc.pid))

Следующий <код> to_upper () использует обмениваться () метод <код> , чтобы отправить входную строку в команду и прочитать все полученные результаты, асинхронно. Как и в случае с Subprocess.POPEN версия того же метода, Comment () возвращает полные строки выходных байтов. Если команда, скорее всего, будет создавать больше данных, чем может удобно вписаться в память, вход не может быть произведен все сразу, или вывод должен быть обработан постепенно, можно использовать ручки stdin, stdout и STDERR > Процесс напрямую вместо звонка <код> обменивается () .

print('communicating with process')
    stdout, stderr  await proc.communicate(input.encode())

После того, как ввод/вывод, ожидание того, что процесс полностью выхода гарантирует, что он правильно очищен.

print('waiting for process to complete')
    await proc.wait()

Затем необходимо изучить код возврата, а выводная байтовая строка декодировала, для приготовления возвращаемого значения от COROUTINE.

return_code  proc.returncode
    print('return code {}'.format(return_code))
    if not return_code:
        results  bytes(stdout).decode()
    else:
        results  ''

    return (return_code, results)

Основная часть программы устанавливает строку сообщений, которая будет преобразована, а затем устанавливает цикл события для запуска <код> to_upper () и печатает результаты.

MESSAGE  """
This message will be converted
to all caps.
"""

event_loop  asyncio.get_event_loop()
try:
    return_code, results  event_loop.run_until_complete(
        to_upper(MESSAGE)
    )
finally:
    event_loop.close()

if return_code:
    print('error exit {}'.format(return_code))
else:
    print('Original: {!r}'.format(MESSAGE))
    print('Changed : {!r}'.format(results))

Вывод показывает последовательность операций, а затем преобразуется простое текстовое сообщение.

$ python3 asyncio_subprocess_coroutine_write.py

in to_upper
launching process
pid 49684
communicating with process
waiting for process to complete
return code 0
Original: '\nThis message will be converted\nto all caps.\n'
Changed : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'