Рубрики
Без рубрики

wxpython и нити

Получите практические, реальные навыки Python на наших ресурсах и пути

Автор оригинала: Mike Driscoll.

Если вы используете GUI в Python много, то вы знаете, что иногда вам нужно выполнить некоторое время длительного проработания. Конечно, если вы сделаете это, так как вы будете с программой командной строки, вы будете удивляться. В большинстве случаев вы в конечном итоге блокируете петлю событий вашего GUI, и пользователь увидит вашу замораживание программы. Что вы можете сделать, чтобы обойти только с мишапами? Начните задачу в другом потоке или процессе курса! В этой статье мы посмотрим, как это сделать с модулем Threading Wxpython и Python.

WxPython Thread Безопасные методы

В мире WXPYPHON есть три связанных метода «Threadsafe». Если вы не используете один из этих трех, когда вы перейдете на обновление вашего пользовательского интерфейса, вы можете испытать странные проблемы. Иногда ваш графический интерфейс будет работать просто хорошо. В других случаях он не покиснет питона без видимой причины. Таким образом, необходимость методов потоков. Вот три метода безопасных потоков, которые Wxpython предоставляет:

  • WX.PostEvent
  • wx.callafter.
  • wx.calllater.

По словам Робина Данна (создатель WxPython), WX.callafter использует WX.PostEvent для отправки события на объект приложения. Приложение будет иметь обработчик событий, связанный с этим событием, и будет реагировать в соответствии с тем, что программист закодирован при получении события. Я понимаю, что wx.calllater вызывает wx.callafter с указанным ограничением времени, чтобы вы могли сказать ему, как долго ждать, прежде чем отправлять мероприятие.

Робин Данн также отметил, что блокировка глобального интерпретатора Python (GIL) предотвратит более одного потока для выполнения Bytecodes Python одновременно, что может ограничить, сколько ядер CPU используются вашей программой. На флип-стороне он также сказал, что «WXPYPHON выпускает GIL при создании вызовов на WX API, чтобы другие нити могли работать в то время». Другими словами, ваш пробег может варьироваться в зависимости от использования ниток на многокатечных машинах. Я нашел эту дискуссию, чтобы быть интересным и запутанным …

Во всяком случае, что это значит в отношении трех WX-методов, заключается в том, что WX.CallLater является наиболее абстрактным методом ThreadsAfe с WX.callafter Next и WX.PostEvent, являющийся самым низким уровнем. В следующих примерах вы увидите, как использовать WX.callafter и WX.Postevent для обновления программы WXPYPHON.

wxpython, резьба, wx.callafter и pubsub

В списке рассылки WXPYPHON вы увидите, как эксперты говорят другим использовать WX.callafter вместе с PubSub, чтобы общаться со своими приложениями WXPYPHON из другого потока. Я, наверное, даже сказал людям делать это. Так что в следующем примере это именно то, что мы собираемся сделать:

import time
import wx

from threading import Thread
from wx.lib.pubsub import Publisher

########################################################################
class TestThread(Thread):
    """Test Worker Thread Class."""
        
    #----------------------------------------------------------------------
    def __init__(self):
        """Init Worker Thread Class."""
        Thread.__init__(self)
        self.start()    # start the thread

    #----------------------------------------------------------------------
    def run(self):
        """Run Worker Thread."""
        # This is the code executing in the new thread.
        for i in range(6):
            time.sleep(10)
            wx.CallAfter(self.postTime, i)
        time.sleep(5)
        wx.CallAfter(Publisher().sendMessage, "update", "Thread finished!")
            
    #----------------------------------------------------------------------
    def postTime(self, amt):
        """
        Send time to GUI
        """
        amtOfTime = (amt + 1) * 10
        Publisher().sendMessage("update", amtOfTime)
        
########################################################################
class MyForm(wx.Frame):
 
    #----------------------------------------------------------------------
    def __init__(self):
        wx.Frame.__init__(self, None, wx.ID_ANY, "Tutorial")
 
        # Add a panel so it looks the correct on all platforms
        panel = wx.Panel(self, wx.ID_ANY)
        self.displayLbl = wx.StaticText(panel, label="Amount of time since thread started goes here")
        self.btn = btn = wx.Button(panel, label="Start Thread")

        btn.Bind(wx.EVT_BUTTON, self.onButton)

        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.displayLbl, 0, wx.ALL|wx.CENTER, 5)
        sizer.Add(btn, 0, wx.ALL|wx.CENTER, 5)
        panel.SetSizer(sizer)
        
        # create a pubsub receiver
        Publisher().subscribe(self.updateDisplay, "update")
        
    #----------------------------------------------------------------------
    def onButton(self, event):
        """
        Runs the thread
        """
        TestThread()
        self.displayLbl.SetLabel("Thread started!")
        btn = event.GetEventObject()
        btn.Disable()
    
    #----------------------------------------------------------------------
    def updateDisplay(self, msg):
        """
        Receives data from thread and updates the display
        """
        t = msg.data
        if isinstance(t, int):
            self.displayLbl.SetLabel("Time since thread started: %s seconds" % t)
        else:
            self.displayLbl.SetLabel("%s" % t)
            self.btn.Enable()
            
#----------------------------------------------------------------------
# Run the program
if __name__ == "__main__":
    app = wx.PySimpleApp()
    frame = MyForm().Show()
    app.MainLoop()

Мы будем использовать Python’s время Модуль, чтобы подделать наш длительный процесс. Однако не стесняйтесь ставить что-то лучше на его месте. В примере реальной жизни я использую нить для открытия Adobe Reader и отправить PDF на принтер. Это может не казаться чем-то особенным, но когда я не использовал нить, кнопка печати в моем приложении будет застреваться, пока документ был отправлен на принтер, и мой GUI просто повесил, пока это не было сделано. Даже второй или два заметны для пользователя!

Во всяком случае, давайте посмотрим, как это работает. В нашем классе потоков (воспроизведены ниже), мы переопределяем метод «RUN», чтобы оно делает то, что мы хотим. Эта тема запускается, когда мы создали его, потому что у нас есть «Self.Start ()» в своем методе __init__. В методе «RUN» мы верете в диапазоне 6, сном в течение 10 секунд между итерациями, а затем обновляем наш пользовательский интерфейс с помощью WX.callafter и Pubsub. Когда петли заканчивается, мы отправляем окончательное сообщение на наше приложение, чтобы пользователь узнал, что произошло.

########################################################################
class TestThread(Thread):
    """Test Worker Thread Class."""
        
    #----------------------------------------------------------------------
    def __init__(self):
        """Init Worker Thread Class."""
        Thread.__init__(self)
        self.start()    # start the thread

    #----------------------------------------------------------------------
    def run(self):
        """Run Worker Thread."""
        # This is the code executing in the new thread.
        for i in range(6):
            time.sleep(10)
            wx.CallAfter(self.postTime, i)
        time.sleep(5)
        wx.CallAfter(Publisher().sendMessage, "update", "Thread finished!")
            
    #----------------------------------------------------------------------
    def postTime(self, amt):
        """
        Send time to GUI
        """
        amtOfTime = (amt + 1) * 10
        Publisher().sendMessage("update", amtOfTime)

Вы заметите, что в нашем коде WxPython мы запускаем нить с помощью обработчика кнопок. Мы также отключаем кнопку, поэтому мы не случайно начинаем дополнительные потоки. Это было бы довольно запутано, если бы у нас была куча их идет, и UI случайно скажет, что это было сделано, когда это не было. Это хорошее упражнение для читателя, хотя. Вы можете отобразить PID нити, чтобы вы знали, что это было … и вы, возможно, захотите вывести эту информацию в контроль текста прокрутки, поэтому вы можете увидеть активность различных потоков.

Последний интерес здесь, вероятно, ресивер Pubsub и его обработчик событий:

def updateDisplay(self, msg):
    """
    Receives data from thread and updates the display
    """
    t = msg.data
    if isinstance(t, int):
        self.displayLbl.SetLabel("Time since thread started: %s seconds" % t)
    else:
        self.displayLbl.SetLabel("%s" % t)
        self.btn.Enable()

Посмотрите, как мы извлекаем сообщение из потока и используем его, чтобы обновить наш дисплей? Мы также используем тип данных, которые мы получаем, чтобы сказать нам, что показать пользователю. Довольно круто, да? Теперь давайте пойдем на уровень и проверьте, как это сделать с WX.PostEvent вместо.

WX.PostEvent и Threads.

Следующий код основан на примере от Wxpython Wiki Отказ Это немного сложнее, чем кода WX.callafter, на котором мы просто смотрели, но я уверен, что мы можем понять это.

import time
import wx

from threading import Thread

# Define notification event for thread completion
EVT_RESULT_ID = wx.NewId()

def EVT_RESULT(win, func):
    """Define Result Event."""
    win.Connect(-1, -1, EVT_RESULT_ID, func)

class ResultEvent(wx.PyEvent):
    """Simple event to carry arbitrary result data."""
    def __init__(self, data):
        """Init Result Event."""
        wx.PyEvent.__init__(self)
        self.SetEventType(EVT_RESULT_ID)
        self.data = data

########################################################################
class TestThread(Thread):
    """Test Worker Thread Class."""
        
    #----------------------------------------------------------------------
    def __init__(self, wxObject):
        """Init Worker Thread Class."""
        Thread.__init__(self)
        self.wxObject = wxObject
        self.start()    # start the thread

    #----------------------------------------------------------------------
    def run(self):
        """Run Worker Thread."""
        # This is the code executing in the new thread.
        for i in range(6):
            time.sleep(10)
            amtOfTime = (i + 1) * 10
            wx.PostEvent(self.wxObject, ResultEvent(amtOfTime))
        time.sleep(5)
        wx.PostEvent(self.wxObject, ResultEvent("Thread finished!"))
                    
########################################################################
class MyForm(wx.Frame):
 
    #----------------------------------------------------------------------
    def __init__(self):
        wx.Frame.__init__(self, None, wx.ID_ANY, "Tutorial")
 
        # Add a panel so it looks the correct on all platforms
        panel = wx.Panel(self, wx.ID_ANY)
        self.displayLbl = wx.StaticText(panel, label="Amount of time since thread started goes here")
        self.btn = btn = wx.Button(panel, label="Start Thread")

        btn.Bind(wx.EVT_BUTTON, self.onButton)

        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.displayLbl, 0, wx.ALL|wx.CENTER, 5)
        sizer.Add(btn, 0, wx.ALL|wx.CENTER, 5)
        panel.SetSizer(sizer)
        
        # Set up event handler for any worker thread results
        EVT_RESULT(self, self.updateDisplay)
        
    #----------------------------------------------------------------------
    def onButton(self, event):
        """
        Runs the thread
        """
        TestThread(self)
        self.displayLbl.SetLabel("Thread started!")
        btn = event.GetEventObject()
        btn.Disable()
    
    #----------------------------------------------------------------------
    def updateDisplay(self, msg):
        """
        Receives data from thread and updates the display
        """
        t = msg.data
        if isinstance(t, int):
            self.displayLbl.SetLabel("Time since thread started: %s seconds" % t)
        else:
            self.displayLbl.SetLabel("%s" % t)
            self.btn.Enable()
            
#----------------------------------------------------------------------
# Run the program
if __name__ == "__main__":
    app = wx.PySimpleApp()
    frame = MyForm().Show()
    app.MainLoop()

Давайте немного сломаемся. Для меня самые запутанные вещи – первые три штуки:

# Define notification event for thread completion
EVT_RESULT_ID = wx.NewId()

def EVT_RESULT(win, func):
    """Define Result Event."""
    win.Connect(-1, -1, EVT_RESULT_ID, func)

class ResultEvent(wx.PyEvent):
    """Simple event to carry arbitrary result data."""
    def __init__(self, data):
        """Init Result Event."""
        wx.PyEvent.__init__(self)
        self.SetEventType(EVT_RESULT_ID)
        self.data = data

EVT_RESULT_ID – это ключ здесь. Это связывает нить к WX.PYEVENT и эта странная функция «EVT_RESULT». В коде WXPYPHON мы связываем обработчик событий к функции EVT_RESULT. Это позволяет нам нам WX.PostEvent в потоке отправлять событие на наш пользовательский класс событий, решатель. Что это делает? Он посылает данные в программу WXPYPHON, излучаю этот пользовательский evt_result, который мы обязаны. Я надеюсь, что все имеет смысл.

После того, как вы получите это, если выясните в своей голове, прочитайте. Вы готовы? Хорошо! Вы заметите, что наше Testthread Класс в значительной степени такой же, как и раньше, за исключением того, что мы используем WX.PostEvent, чтобы отправить наши сообщения на GUI вместо Pubsub. API в нашем дисплее дисплея GUI не изменится. Мы все еще просто используем свойство данных сообщения для извлечения данных, которые мы хотим. Это все, что есть к этому!

Обертывание

Надеюсь, вы теперь знаете, как использовать основные методы резьбы в ваших программах WXPYPHON. Существует несколько других методов потоковых методов, которые у нас не было возможности покрыть здесь, например, использование WX.yield или очередей. К счастью, Wxpython Wiki покрывает эти темы довольно хорошо, поэтому обязательно проверьте ссылки ниже, если вы заинтересованы в этих методах.

Дальнейшее чтение

Загрузки

  • wxthreads.zip
  • wxdhreads.tar.tar.