• Using a background thread with asyncio/futures with flask

    From Thomas Nyberg@3:633/280.2 to All on Wed Mar 20 19:22:51 2024

    Hello,

    I have a simple (and not working) example of what I'm trying to do. This
    is a simplified version of what I'm trying to achieve (obviously the background workers and finalizer functions will do more later):

    `app.py`

    ```
    import asyncio
    import threading
    import time
    from queue import Queue

    from flask import Flask

    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future = out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=worker, daemon=True).start() threading.Thread(target=finalizer, daemon=True).start()

    app = Flask(__name__)


    @app.route("/")
    async def root():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"root put future: {future}")
    result = await future
    return result


    if __name__ == "__main__":
    app.run()
    ```

    If I start up that server, and execute `curl http://localhost:5000`, it
    prints out the following in the server before hanging:

    ```
    $ python3 app.py
    worker started running
    finalizer started running
    * Serving Flask app 'app'
    * Debug mode: off
    WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
    * Running on http://127.0.0.1:5000
    Press CTRL+C to quit
    root put future: <Future pending>
    worker got future: <Future pending cb=[Task.task_wakeup()]>
    worker sleeped
    finalizer got future: <Future pending cb=[Task.task_wakeup()]>
    finalizer set result
    ```

    Judging by what's printing out, the `final result = await future`
    doesn't seem to be happy here.

    Maybe someone sees something obvious I'm doing wrong here? I presume I'm mixing threads and asyncio in a way I shouldn't be.

    Here's some system information (just freshly installed with pip3 install flask[async] in a virtual environment for python version 3.11.2):

    ```
    $ uname -a
    Linux x1carbon 6.1.0-18-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.76-1 (2024-02-01) x86_64 GNU/Linux

    $ python3 -V
    Python 3.11.2

    $ pip3 freeze
    asgiref==3.7.2
    blinker==1.7.0
    click==8.1.7
    Flask==3.0.2
    itsdangerous==2.1.2
    Jinja2==3.1.3
    MarkupSafe==2.1.5
    Werkzeug==3.0.1
    ```

    Thanks for any help!

    Cheers,
    Thomas

    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From Lars Liedtke@3:633/280.2 to All on Fri Mar 22 18:27:28 2024

    0J07F+iZ4R8i5oU/TDzOBpzarHIv3S2dqI5VUEgojYB6RkotFNpzu2EzxaShUllKM+jRh xX2Gl7ln+ono2VP/7BlcqMh9iRtCB+mkAyY8035MToZnB0Mix7LQ9/vdz0Y3/zgRyFh1hUML lW1Y7nbrRSZYCEWaflsayaCCSHGqImG3OdWK500gv8jx0Veb+7BRwD8mM7NnpIXVmvONbMX/ 8xrwyrOC/Ob9Gf6uJcSRtCxk+zil0NsX75ibs/YjuxCjfUKEdzP7ATdbjktouPUAEQEAAcLB dgQYAQgAIBYhBOVpZah3AlnlMpCg1Og/5E450pgXBQJktRlRAhsMAAoJEOg/5E450pgX3E0P /13NzNCJU1anecvL+eSMynOHMhkN770oLwOfz+giiTRrYHxLhxzUs+e41uUY/7tbdrwmLxZe CNbBLC20YKHr+VvJ+LkJxz1GFJ0dlKLa+J1fERT6t+Sc5L5nmvFgZDwdFncrr6m+G27mqpc4 2QnABxu07Wu5dYDYN7XDbZAjB/0JBgMVKCVdbb2DVn+CquQ0j4wQbLGxsHPc5psBB+Hrpy+P nuWA9dAtVBz6ztI862VX9aXt2cO9Daoclwg5n1JewNglMZ1Kxishi9fmZ2KhSW+uq7X7eZDT +PHtWInMRhLsKCTrAYwBHpGxyYLYACswLAv2CuRbD9ZHaU32Jrz5D7i6LTC4mVDZenmu4E8y q3Xbuv3SKvRancajLiDRZd1Dj1gylshkYhfxp07ra21dD4SgkNldHXj1qsrgeXRbgEbHMdoq I5WBMqiyuBxnCQ903hRyW3ds9g6kDSwimysOGmy2VaqpwGBtb4hnleICCx77D+7kNKLkShkh IVVjkppgUsoHe09EczTzobrrSB8GltBrq0vvzMmHvvEj7ummpJmFCxtUjvngMdLtc7kUzMku V0/+zdlow2QR1RhXS+ksxrkVwYk6Zyp0285U2K8FKFQCKVroFm9jBr20AL2pYDU1aCNxoAO5 vXtW/dDJi28vvdyXPN5K33xtf+SFMcedqkCc
    Hey,

    As far as I know (might be old news) flask does not support asyncio.

    You would have to use a different framework, like e.g. FastAPI or similar. = Maybe someone has already written "flask with asyncio" but I don't know abo=
    ut that.

    Cheers

    Lars


    Lars Liedtke
    Lead Developer

    [Tel.] +49 721 98993-
    [Fax] +49 721 98993-
    [E-Mail] lal@solute.de<mailto:lal@solute.de>


    solute GmbH
    Zeppelinstra=C3=9Fe 15
    76185 Karlsruhe
    Germany

    [Marken]

    Gesch=C3=A4ftsf=C3=BChrer | Managing Director: Dr. Thilo Gans, Bernd Vermaa= ten
    Webseite | www.solute.de <http://www.solute.de/>
    Sitz | Registered Office: Karlsruhe
    Registergericht | Register Court: Amtsgericht Mannheim
    Registernummer | Register No.: HRB 748044
    USt-ID | VAT ID: DE234663798



    Informationen zum Datenschutz | Information about privacy policy https://www.solute.de/ger/datenschutz/grundsaetze-der-datenverarbeitung.php




    Am 20.03.24 um 09:22 schrieb Thomas Nyberg via Python-list:

    Hello,

    I have a simple (and not working) example of what I'm trying to do. This is=
    a simplified version of what I'm trying to achieve (obviously the backgrou=
    nd workers and finalizer functions will do more later):

    `app.py`

    ```
    import asyncio
    import threading
    import time
    from queue import Queue

    from flask import Flask

    in_queue =3D Queue()
    out_queue =3D Queue()


    def worker():
    print("worker started running")
    while True:
    future =3D in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future =3D out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=3Dworker, daemon=3DTrue).start() threading.Thread(target=3Dfinalizer, daemon=3DTrue).start()

    app =3D Flask(__name__)


    @app.route("/")
    async def root():
    future =3D asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"root put future: {future}")
    result =3D await future
    return result


    if __name__ =3D=3D "__main__":
    app.run()
    ```

    If I start up that server, and execute `curl http://localhost:5000`, it pri= nts out the following in the server before hanging:

    ```
    $ python3 app.py
    worker started running
    finalizer started running
    * Serving Flask app 'app'
    * Debug mode: off
    WARNING: This is a development server. Do not use it in a production deploy= ment. Use a production WSGI server instead.
    * Running on http://127.0.0.1:5000
    Press CTRL+C to quit
    root put future: <Future pending>
    worker got future: <Future pending cb=3D[Task.task_wakeup()]>
    worker sleeped
    finalizer got future: <Future pending cb=3D[Task.task_wakeup()]>
    finalizer set result
    ```

    Judging by what's printing out, the `final result =3D await future` doesn't=
    seem to be happy here.

    Maybe someone sees something obvious I'm doing wrong here? I presume I'm mi= xing threads and asyncio in a way I shouldn't be.

    Here's some system information (just freshly installed with pip3 install fl= ask[async] in a virtual environment for python version 3.11.2):

    ```
    $ uname -a
    Linux x1carbon 6.1.0-18-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.76-1 (2024-= 02-01) x86_64 GNU/Linux

    $ python3 -V
    Python 3.11.2

    $ pip3 freeze
    asgiref=3D=3D3.7.2
    blinker=3D=3D1.7.0
    click=3D=3D8.1.7
    Flask=3D=3D3.0.2
    itsdangerous=3D=3D2.1.2
    Jinja2=3D=3D3.1.3
    MarkupSafe=3D=3D2.1.5
    Werkzeug=3D=3D3.0.1
    ```

    Thanks for any help!

    Cheers,
    Thomas

    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: solute GmbH (3:633/280.2@fidonet)
  • From Chris Angelico@3:633/280.2 to All on Fri Mar 22 18:58:20 2024
    On Fri, 22 Mar 2024 at 18:35, Lars Liedtke via Python-list <python-list@python.org> wrote:

    Hey,

    As far as I know (might be old news) flask does not support asyncio.

    You would have to use a different framework, like e.g. FastAPI or similar. Maybe someone has already written "flask with asyncio" but I don't know about that.


    Did you try searching their documentation?

    https://flask.palletsprojects.com/en/3.0.x/async-await/

    ChrisA

    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From Thomas Nyberg@3:633/280.2 to All on Fri Mar 22 21:08:58 2024
    Hi,

    Yeah so flask does support async (when installed with `pip3 install flask[async]), but you are making a good point that flask in this case
    is a distraction. Here's an example using just the standard library that exhibits the same issue:

    `app.py`
    ```
    import asyncio
    import threading
    import time
    from queue import Queue


    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future = out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=worker).start() threading.Thread(target=finalizer).start()


    async def main():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"main put future: {future}")
    result = await future
    print(result)


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    ```

    If I run that I see the following printed out (after which is just hangs):

    ```
    $ python3 app.py
    worker started running
    finalizer started running
    main put future: <Future pending>
    worker got future: <Future pending>
    worker sleeped
    finalizer got future: <Future pending cb=[Task.task_wakeup()]>
    finalizer set result
    ```

    I believe async uses a cooperative multitasking setup under the hood, so
    I presume the way I'm doing this threading just isn't playing well with
    that (and presumably some csp yield isn't happening somewhere). Anyway
    at this point I feel like the easiest approach is to just throw away
    threads entirely and learn how to do all I want fully in the brave new
    async world, but I'm still curious why this is failing and how to make
    this sort of setup work since it points to my not understanding the
    basic implementation/semantics of async in python.

    Thanks for any help!

    /Thomas

    On 3/22/24 08:27, Lars Liedtke via Python-list wrote:
    Hey,

    As far as I know (might be old news) flask does not support asyncio.

    You would have to use a different framework, like e.g. FastAPI or
    similar. Maybe someone has already written "flask with asyncio" but I
    don't know about that.

    Cheers

    Lars


    Lars Liedtke
    Lead Developer

    [Tel.] +49 721 98993-
    [Fax] +49 721 98993-
    [E-Mail] lal@solute.de<mailto:lal@solute.de>


    solute GmbH
    Zeppelinstrae 15
    76185 Karlsruhe
    Germany

    [Marken]

    Geschftsfhrer | Managing Director: Dr. Thilo Gans, Bernd Vermaaten
    Webseite | www.solute.de <http://www.solute.de/>
    Sitz | Registered Office: Karlsruhe
    Registergericht | Register Court: Amtsgericht Mannheim
    Registernummer | Register No.: HRB 748044
    USt-ID | VAT ID: DE234663798



    Informationen zum Datenschutz | Information about privacy policy https://www.solute.de/ger/datenschutz/grundsaetze-der-datenverarbeitung.php




    Am 20.03.24 um 09:22 schrieb Thomas Nyberg via Python-list:

    Hello,

    I have a simple (and not working) example of what I'm trying to do. This
    is a simplified version of what I'm trying to achieve (obviously the background workers and finalizer functions will do more later):

    `app.py`

    ```
    import asyncio
    import threading
    import time
    from queue import Queue

    from flask import Flask

    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future = out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=worker, daemon=True).start() threading.Thread(target=finalizer, daemon=True).start()

    app = Flask(__name__)


    @app.route("/")
    async def root():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"root put future: {future}")
    result = await future
    return result


    if __name__ == "__main__":
    app.run()
    ```

    If I start up that server, and execute `curl http://localhost:5000`, it prints out the following in the server before hanging:

    ```
    $ python3 app.py
    worker started running
    finalizer started running
    * Serving Flask app 'app'
    * Debug mode: off
    WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
    * Running on http://127.0.0.1:5000
    Press CTRL+C to quit
    root put future: <Future pending>
    worker got future: <Future pending cb=[Task.task_wakeup()]>
    worker sleeped
    finalizer got future: <Future pending cb=[Task.task_wakeup()]>
    finalizer set result
    ```

    Judging by what's printing out, the `final result = await future`
    doesn't seem to be happy here.

    Maybe someone sees something obvious I'm doing wrong here? I presume I'm mixing threads and asyncio in a way I shouldn't be.

    Here's some system information (just freshly installed with pip3 install flask[async] in a virtual environment for python version 3.11.2):

    ```
    $ uname -a
    Linux x1carbon 6.1.0-18-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.76-1 (2024-02-01) x86_64 GNU/Linux

    $ python3 -V
    Python 3.11.2

    $ pip3 freeze
    asgiref==3.7.2
    blinker==1.7.0
    click==8.1.7
    Flask==3.0.2
    itsdangerous==2.1.2
    Jinja2==3.1.3
    MarkupSafe==2.1.5
    Werkzeug==3.0.1
    ```

    Thanks for any help!

    Cheers,
    Thomas

    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From Frank Millman@3:633/280.2 to All on Fri Mar 22 21:09:10 2024
    On 2024-03-20 10:22 AM, Thomas Nyberg via Python-list wrote:

    Hello,

    I have a simple (and not working) example of what I'm trying to do. This
    is a simplified version of what I'm trying to achieve (obviously the background workers and finalizer functions will do more later):

    `app.py`

    ```
    import asyncio
    import threading
    import time
    from queue import Queue

    from flask import Flask

    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future = out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=worker, daemon=True).start() threading.Thread(target=finalizer, daemon=True).start()

    app = Flask(__name__)


    @app.route("/")
    async def root():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"root put future: {future}")
    result = await future
    return result


    if __name__ == "__main__":
    app.run()
    ```

    If I start up that server, and execute `curl http://localhost:5000`, it prints out the following in the server before hanging:

    ```
    $ python3 app.py
    worker started running
    finalizer started running
    * Serving Flask app 'app'
    * Debug mode: off
    WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
    * Running on http://127.0.0.1:5000
    Press CTRL+C to quit
    root put future: <Future pending>
    worker got future: <Future pending cb=[Task.task_wakeup()]>
    worker sleeped
    finalizer got future: <Future pending cb=[Task.task_wakeup()]>
    finalizer set result
    ```

    Judging by what's printing out, the `final result = await future`
    doesn't seem to be happy here.

    Maybe someone sees something obvious I'm doing wrong here? I presume I'm mixing threads and asyncio in a way I shouldn't be.

    Here's some system information (just freshly installed with pip3 install flask[async] in a virtual environment for python version 3.11.2):

    ```
    $ uname -a
    Linux x1carbon 6.1.0-18-amd64 #1 SMP PREEMPT_DYNAMIC Debian 6.1.76-1 (2024-02-01) x86_64 GNU/Linux

    $ python3 -V
    Python 3.11.2

    $ pip3 freeze
    asgiref==3.7.2
    blinker==1.7.0
    click==8.1.7
    Flask==3.0.2
    itsdangerous==2.1.2
    Jinja2==3.1.3
    MarkupSafe==2.1.5
    Werkzeug==3.0.1
    ```

    Thanks for any help!

    Cheers,
    Thomas

    Hi Thomas

    I am no expert. However, I do have something similar in my app, and it
    works.

    I do not use 'await future', I use 'asyncio.wait_for(future)'.

    HTH

    Frank Millman



    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From Lars Liedtke@3:633/280.2 to All on Fri Mar 22 21:16:29 2024

    0J07F+iZ4R8i5oU/TDzOBpzarHIv3S2dqI5VUEgojYB6RkotFNpzu2EzxaShUllKM+jRh xX2Gl7ln+ono2VP/7BlcqMh9iRtCB+mkAyY8035MToZnB0Mix7LQ9/vdz0Y3/zgRyFh1hUML lW1Y7nbrRSZYCEWaflsayaCCSHGqImG3OdWK500gv8jx0Veb+7BRwD8mM7NnpIXVmvONbMX/ 8xrwyrOC/Ob9Gf6uJcSRtCxk+zil0NsX75ibs/YjuxCjfUKEdzP7ATdbjktouPUAEQEAAcLB dgQYAQgAIBYhBOVpZah3AlnlMpCg1Og/5E450pgXBQJktRlRAhsMAAoJEOg/5E450pgX3E0P /13NzNCJU1anecvL+eSMynOHMhkN770oLwOfz+giiTRrYHxLhxzUs+e41uUY/7tbdrwmLxZe CNbBLC20YKHr+VvJ+LkJxz1GFJ0dlKLa+J1fERT6t+Sc5L5nmvFgZDwdFncrr6m+G27mqpc4 2QnABxu07Wu5dYDYN7XDbZAjB/0JBgMVKCVdbb2DVn+CquQ0j4wQbLGxsHPc5psBB+Hrpy+P nuWA9dAtVBz6ztI862VX9aXt2cO9Daoclwg5n1JewNglMZ1Kxishi9fmZ2KhSW+uq7X7eZDT +PHtWInMRhLsKCTrAYwBHpGxyYLYACswLAv2CuRbD9ZHaU32Jrz5D7i6LTC4mVDZenmu4E8y q3Xbuv3SKvRancajLiDRZd1Dj1gylshkYhfxp07ra21dD4SgkNldHXj1qsrgeXRbgEbHMdoq I5WBMqiyuBxnCQ903hRyW3ds9g6kDSwimysOGmy2VaqpwGBtb4hnleICCx77D+7kNKLkShkh IVVjkppgUsoHe09EczTzobrrSB8GltBrq0vvzMmHvvEj7ummpJmFCxtUjvngMdLtc7kUzMku V0/+zdlow2QR1RhXS+ksxrkVwYk6Zyp0285U2K8FKFQCKVroFm9jBr20AL2pYDU1aCNxoAO5 vXtW/dDJi28vvdyXPN5K33xtf+SFMcedqkCc
    Sorry, must have missed that :-/


    Lars Liedtke
    Lead Developer

    [Tel.] +49 721 98993-
    [Fax] +49 721 98993-
    [E-Mail] lal@solute.de<mailto:lal@solute.de>


    solute GmbH
    Zeppelinstra=C3=9Fe 15
    76185 Karlsruhe
    Germany

    [Marken]

    Gesch=C3=A4ftsf=C3=BChrer | Managing Director: Dr. Thilo Gans, Bernd Vermaa= ten
    Webseite | www.solute.de <http://www.solute.de/>
    Sitz | Registered Office: Karlsruhe
    Registergericht | Register Court: Amtsgericht Mannheim
    Registernummer | Register No.: HRB 748044
    USt-ID | VAT ID: DE234663798



    Informationen zum Datenschutz | Information about privacy policy https://www.solute.de/ger/datenschutz/grundsaetze-der-datenverarbeitung.php




    Am 22.03.24 um 08:58 schrieb Chris Angelico via Python-list:

    On Fri, 22 Mar 2024 at 18:35, Lars Liedtke via Python-list <python-list@python.org><mailto:python-list@python.org> wrote:



    Hey,

    As far as I know (might be old news) flask does not support asyncio.

    You would have to use a different framework, like e.g. FastAPI or similar. = Maybe someone has already written "flask with asyncio" but I don't know abo=
    ut that.




    Did you try searching their documentation?

    https://flask.palletsprojects.com/en/3.0.x/async-await/

    ChrisA


    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: solute GmbH (3:633/280.2@fidonet)
  • From Frank Millman@3:633/280.2 to All on Fri Mar 22 22:23:43 2024
    On 2024-03-22 12:09 PM, Frank Millman via Python-list wrote:

    I am no expert. However, I do have something similar in my app, and it works.

    I do not use 'await future', I use 'asyncio.wait_for(future)'.


    I tested it and it did not work.

    I am not sure, but I think the problem is that you have a mixture of
    blocking and non-blocking functions.

    Here is a version that works. However, it is a bit different, so I don't
    know if it fits your use case.

    I have replaced the threads with background asyncio tasks.

    I have replaced instances of queue.Queue with asyncio.Queue.

    Frank

    ===============================================

    import asyncio

    in_queue = asyncio.Queue()
    out_queue = asyncio.Queue()

    async def worker():
    print("worker started running")
    while True:
    future = await in_queue.get()
    print(f"worker got future: {future}")
    await asyncio.sleep(5)
    print("worker sleeped")
    await out_queue.put(future)

    async def finalizer():
    print("finalizer started running")
    while True:
    future = await out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")

    async def main():
    asyncio.create_task(worker()) # start a background task
    asyncio.create_task(finalizer()) # ditto
    future = asyncio.get_event_loop().create_future()
    await in_queue.put(future)
    print(f"main put future: {future}")
    result = await asyncio.wait_for(future, timeout=None)
    print(result)

    if __name__ == "__main__":
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(main())

    # this is the preferred way to start an asyncio app
    asyncio.run(main())



    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From Frank Millman@3:633/280.2 to All on Fri Mar 22 22:28:56 2024
    On 2024-03-22 1:23 PM, Frank Millman via Python-list wrote:
    On 2024-03-22 12:09 PM, Frank Millman via Python-list wrote:

    I am no expert. However, I do have something similar in my app, and it
    works.

    I do not use 'await future', I use 'asyncio.wait_for(future)'.


    I tested it and it did not work.

    I am not sure, but I think the problem is that you have a mixture of blocking and non-blocking functions.

    Here is a version that works. However, it is a bit different, so I don't know if it fits your use case.

    I have replaced the threads with background asyncio tasks.

    I have replaced instances of queue.Queue with asyncio.Queue.

    Frank

    ===============================================

    import asyncio

    in_queue = asyncio.Queue()
    out_queue = asyncio.Queue()

    async def worker():
    print("worker started running")
    while True:
    future = await in_queue.get()
    print(f"worker got future: {future}")
    await asyncio.sleep(5)
    print("worker sleeped")
    await out_queue.put(future)

    async def finalizer():
    print("finalizer started running")
    while True:
    future = await out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")

    async def main():
    asyncio.create_task(worker()) # start a background task
    asyncio.create_task(finalizer()) # ditto
    future = asyncio.get_event_loop().create_future()
    await in_queue.put(future)
    print(f"main put future: {future}")
    result = await asyncio.wait_for(future, timeout=None)
    print(result)

    if __name__ == "__main__":
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(main())

    # this is the preferred way to start an asyncio app
    asyncio.run(main())



    One more point.

    If I change 'await asyncio.wait_for(future, timeout=None)' back to your original 'await future', it still works.


    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From dieter.maurer@online.de@3:633/280.2 to All on Sat Mar 23 04:28:25 2024
    Thomas Nyberg wrote at 2024-3-22 11:08 +0100:
    ... `future` use across thread boundaries ...
    Here's an example using just the standard library that
    exhibits the same issue:

    I think all `asyncio` objects (futures, tasks, ...)
    are meant to be used in a single thread.
    If you use them across different threads, you must do special things.

    Note that an `await(future)` registers a callback at *future*.
    When the future gets its result or exception, the registered callback call=
    s
    are scheduled via `self._loop.call_soon`.
    `call_soon` must be called from the `asyncio` thread (to which `self._loop=
    `
    belongs). A different thread may schedule activities for a loop
    but it must use `call_soon_threadsafe` (not `call_soon`).

    I would expect that the forbidden `call_soon` call raises an exception
    which for reasons I do not know appears to be hidden.


    For use across thread boundaries, you likely will use
    `concurrent.Future` (not `asyncio.Future`).
    You can use `asyncio.futures._chain_futures` to associate
    an `asyncio.Future` with a `concurrent.Future`.
    Then the fate (result or exception set) of one will be reflected in the ot= her.

    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From dieter.maurer@online.de@3:633/280.2 to All on Sat Mar 23 06:28:31 2024
    dieter.maurer@online.de wrote at 2024-3-22 18:28 +0100:
    Thomas Nyberg wrote at 2024-3-22 11:08 +0100:
    ... `future` use across thread boundaries ...
    Here's an example using just the standard library that
    exhibits the same issue:
    ...
    For use across thread boundaries, you likely will use
    `concurrent.Future` (not `asyncio.Future`).
    You can use `asyncio.futures._chain_futures` to associate
    an `asyncio.Future` with a `concurrent.Future`.
    Then the fate (result or exception set) of one will be reflected in the o= ther.

    You must not set the result/exception for a future in a "foreign" thread ("foreign" here means one not associated with the future's loop).
    An aternative to the solution sketched above is to set the result
    indirectly via `loop.call_soon_threadsafe`. This way, the
    result is set in the futures "native" thread.

    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From Mark Bourne@3:633/280.2 to All on Sat Mar 23 07:58:31 2024
    Thomas Nyberg wrote:
    Hi,

    Yeah so flask does support async (when installed with `pip3 install flask[async]), but you are making a good point that flask in this case
    is a distraction. Here's an example using just the standard library that exhibits the same issue:

    `app.py`
    ```
    import asyncio
    import threading
    import time
    from queue import Queue


    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future = out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=worker).start() threading.Thread(target=finalizer).start()


    async def main():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"main put future: {future}")
    result = await future
    print(result)


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    ```

    If I run that I see the following printed out (after which is just hangs):

    ```
    $ python3 app.py
    worker started running
    finalizer started running
    main put future: <Future pending>
    worker got future: <Future pending>
    worker sleeped
    finalizer got future: <Future pending cb=[Task.task_wakeup()]>
    finalizer set result
    ```

    I believe async uses a cooperative multitasking setup under the hood, so
    I presume the way I'm doing this threading just isn't playing well with
    that (and presumably some csp yield isn't happening somewhere). Anyway
    at this point I feel like the easiest approach is to just throw away
    threads entirely and learn how to do all I want fully in the brave new
    async world, but I'm still curious why this is failing and how to make
    this sort of setup work since it points to my not understanding the
    basic implementation/semantics of async in python.

    Thanks for any help!

    /Thomas

    On 3/22/24 08:27, Lars Liedtke via Python-list wrote:
    Hey,

    As far as I know (might be old news) flask does not support asyncio.

    You would have to use a different framework, like e.g. FastAPI or
    similar. Maybe someone has already written "flask with asyncio" but I
    don't know about that.

    Cheers

    Lars


    Lars Liedtke
    Lead Developer

    [Tel.] +49 721 98993-
    [Fax] +49 721 98993-
    [E-Mail] lal@solute.de<mailto:lal@solute.de>


    solute GmbH
    Zeppelinstrae 15
    76185 Karlsruhe
    Germany

    [Marken]

    Geschftsfhrer | Managing Director: Dr. Thilo Gans, Bernd Vermaaten
    Webseite | www.solute.de <http://www.solute.de/>
    Sitz | Registered Office: Karlsruhe
    Registergericht | Register Court: Amtsgericht Mannheim
    Registernummer | Register No.: HRB 748044
    USt-ID | VAT ID: DE234663798



    Informationen zum Datenschutz | Information about privacy policy
    https://www.solute.de/ger/datenschutz/grundsaetze-der-datenverarbeitung.php >>




    Am 20.03.24 um 09:22 schrieb Thomas Nyberg via Python-list:

    Hello,

    I have a simple (and not working) example of what I'm trying to do.
    This is a simplified version of what I'm trying to achieve (obviously
    the background workers and finalizer functions will do more later):

    `app.py`

    ```
    import asyncio
    import threading
    import time
    from queue import Queue

    from flask import Flask

    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future = out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=worker, daemon=True).start()
    threading.Thread(target=finalizer, daemon=True).start()

    app = Flask(__name__)


    @app.route("/")
    async def root():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"root put future: {future}")
    result = await future
    return result


    if __name__ == "__main__":
    app.run()
    ```

    If I start up that server, and execute `curl http://localhost:5000`,
    it prints out the following in the server before hanging:

    ```
    $ python3 app.py
    worker started running
    finalizer started running
    * Serving Flask app 'app'
    * Debug mode: off
    WARNING: This is a development server. Do not use it in a production
    deployment. Use a production WSGI server instead.
    * Running on http://127.0.0.1:5000
    Press CTRL+C to quit
    root put future: <Future pending>
    worker got future: <Future pending cb=[Task.task_wakeup()]>
    worker sleeped
    finalizer got future: <Future pending cb=[Task.task_wakeup()]>
    finalizer set result
    ```

    Judging by what's printing out, the `final result = await future`
    doesn't seem to be happy here.

    Maybe someone sees something obvious I'm doing wrong here? I presume
    I'm mixing threads and asyncio in a way I shouldn't be.

    Aside from possible issues mixing threads and asyncio (I'm no expert on asyncio), there's also the issue that there's nothing to cause the
    threads to exit. The following doesn't use asyncio, but also hangs
    after the main thread has got the result:

    ```
    import queue
    import threading
    import time

    in_queue = queue.Queue()
    out_queue = queue.Queue()
    result_queue = queue.Queue()

    def worker():
    print("worker started running")
    while True:
    item = in_queue.get()
    print(f"worker got item: {item}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(item)


    def finalizer():
    print("finalizer started running")
    while True:
    item = out_queue.get()
    print(f"finalizer got item: {item}")
    result_queue.put(item)
    print("finalizer set result")


    threading.Thread(target=worker).start() threading.Thread(target=finalizer).start()
    # threading.Thread(target=worker, daemon=True).start()
    # threading.Thread(target=finalizer, daemon=True).start()


    def main():
    item = "Item to process"
    in_queue.put("Item to process")
    print(f"main put item: {item}")
    result = None
    while True:
    try:
    result = result_queue.get(timeout=1)
    except queue.Empty:
    # No result yet
    print("main waiting for result")
    continue
    break
    print(f"main got result {result}")


    if __name__ == "__main__":
    main()
    ```

    By default, the main process won't exit until there are no non-daemon
    threads still running. You can either send some sort of signal to the
    threads signal the threads to exit the loop and return cleanly (you'd
    also need a timeout on the queue `get()` calls). Or you can create the threads as "daemon" threads (as in the commented-out lines), in which
    case they'll be killed when all non-daemon threads have exited. Daemon threads don't get a chance to do any cleanup, close resources, etc. when they're killed, though, so aren't always appropriate.

    --
    Mark.

    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: A noiseless patient Spider (3:633/280.2@fidonet)
  • From Frank Millman@3:633/280.2 to All on Sun Mar 24 00:25:28 2024
    On 2024-03-22 12:08 PM, Thomas Nyberg via Python-list wrote:
    Hi,

    Yeah so flask does support async (when installed with `pip3 install flask[async]), but you are making a good point that flask in this case
    is a distraction. Here's an example using just the standard library that exhibits the same issue:

    `app.py`
    ```
    import asyncio
    import threading
    import time
    from queue import Queue


    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put(future)


    def finalizer():
    print("finalizer started running")
    while True:
    future = out_queue.get()
    print(f"finalizer got future: {future}")
    future.set_result("completed")
    print("finalizer set result")


    threading.Thread(target=worker).start() threading.Thread(target=finalizer).start()


    async def main():
    future = asyncio.get_event_loop().create_future()
    in_queue.put(future)
    print(f"main put future: {future}")
    result = await future
    print(result)


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    ```

    If I run that I see the following printed out (after which is just hangs):

    ```

    Combining Dieter's and Mark's ideas, here is a version that works.

    It is not pretty! call_soon_threadsafe() is a loop function, but the
    loop is not accessible from a different thread. Therefore I include a reference to the loop in the message passed to in_queue, which in turn
    passes it to out_queue.

    Frank

    =======================================================

    import asyncio
    import threading
    import time
    from queue import Queue


    in_queue = Queue()
    out_queue = Queue()


    def worker():
    print("worker started running")
    while True:
    loop, future = in_queue.get()
    print(f"worker got future: {future}")
    time.sleep(5)
    print("worker sleeped")
    out_queue.put((loop, future))


    def finalizer():
    print("finalizer started running")
    while True:
    loop, future = out_queue.get()
    print(f"finalizer got future: {future}")
    loop.call_soon_threadsafe(future.set_result, "completed")
    print("finalizer set result")


    threading.Thread(target=worker, daemon=True).start() threading.Thread(target=finalizer, daemon=True).start()


    async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    in_queue.put((loop, future))
    print(f"main put future: {future}")
    result = await future
    print(result)


    if __name__ == "__main__":
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(main())
    asyncio.run(main())


    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)
  • From Frank Millman@3:633/280.2 to All on Sun Mar 24 18:34:06 2024
    On 2024-03-23 3:25 PM, Frank Millman via Python-list wrote:


    It is not pretty! call_soon_threadsafe() is a loop function, but the
    loop is not accessible from a different thread. Therefore I include a reference to the loop in the message passed to in_queue, which in turn passes it to out_queue.


    I found that you can retrieve the loop from the future using future.get_loop(), so the above is not necessary.

    Frank



    --- MBSE BBS v1.0.8.4 (Linux-x86_64)
    * Origin: ---:- FTN<->UseNet Gate -:--- (3:633/280.2@fidonet)