diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..82195aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv +.idea \ No newline at end of file diff --git a/README.md b/README.md index 94de273..87a8099 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,10 @@ Online Version: http://sdiehl.github.com/gevent-tutorial/ Want to add an example. Its uber simple. 1. Fork the repo. -2. ``pip install -r requirements.txt`` -3. Edit ``tutorial.md``. +2. Create a virtual environment using [virtualenv](https://pypi.org/project/virtualenv/) package by +running `virtualenv venv` and activate it. You can follow the documentation if you don't know how to use it. +3. ``pip install -r requirements.txt`` +4. Edit ``tutorial.md``. Add your text as Markdown. @@ -43,9 +45,9 @@ Will output this as html: -4. Run ``./build`` -5. Issue pull request. -6. Get good gevent karma. +5. Run ``python build`` +6. Issue pull request. +7. Get good gevent karma. Released under MIT License. diff --git a/body.tmpl b/body.tmpl index 3bbc458..b32d0ab 100644 --- a/body.tmpl +++ b/body.tmpl @@ -39,7 +39,9 @@
- gevent is a concurrency library based around libev. It provides a clean API for a variety of concurrency and network related tasks. + gevent is a concurrency library based around libev or + his new powerful descendant libuv. + It provides a clean API for a variety of concurrency and network related tasks.
diff --git a/build b/build index b593022..9349798 100755 --- a/build +++ b/build @@ -1,14 +1,13 @@ #!/usr/bin/env python -import re +import codecs import os +import re import sys -import codecs +from io import StringIO + import markdown -from cStringIO import StringIO from cogapp import Cog -from jinja2 import Template - from jinja2 import Environment, FileSystemLoader OUTPUT = 'index.html' @@ -35,8 +34,8 @@ start_code = """
"""
 end_code = """
""" r1 = re.sub('\[\[\[end\]\]\]', end_code, cogged) -r2 = re.sub(r'\[\[\[cog',start_code, r1) -r3 = re.sub(r'\]\]\]',end_code + '\n' + start_code, r2) +r2 = re.sub(r'\[\[\[cog', start_code, r1) +r3 = re.sub(r'\]\]\]', end_code + '\n' + start_code, r2) rendered_code = r3 @@ -44,4 +43,4 @@ body = markdown.markdown(rendered_code, extensions=['toc']) with open(OUTPUT, 'w+') as f: f.write(template.render(body=body)) -print 'Wrote', OUTPUT +print('Wrote', OUTPUT) diff --git a/index.html b/index.html index ff3c4f7..420571d 100644 --- a/index.html +++ b/index.html @@ -39,7 +39,9 @@

- gevent is a concurrency library based around libev. It provides a clean API for a variety of concurrency and network related tasks. + gevent is a concurrency library based around libev or + his new powerful descendant libuv. + It provides a clean API for a variety of concurrency and network related tasks.
@@ -78,6 +80,7 @@

  • Long Polling
  • Websockets
  • Chat Server
  • +
  • HTTP2 Server
  • @@ -91,37 +94,38 @@

    Introduction

    applications today.

    Contributors

    In chronological order of contribution: -Stephen Diehl -Jérémy Bethmont -sww -Bruno Bigras -David Ripton -Travis Cline -Boris Feld -youngsterxyf -Eddie Hebert -Alexis Metaireau -Daniel Velkov -Sean Wang -Inada Naoki -Balthazar Rouberol -Glen Baker -Jan-Philip Gehrcke -Matthijs van der Vleuten -Simon Hayward -Alexander James Phillips -Ramiro Morales -Philip Damra -Francisco José Marques Vieira -David Xia -satoru -James Summerfield -Adam Szkoda -Roy Smith -Jianbin Wei -Anton Larkin -Matias Herranz -Pietro Bertera

    +Stephen Diehl, +Jérémy Bethmont, +sww, +Bruno Bigras, +David Ripton, +Travis Cline, +Boris Feld, +youngsterxyf, +Eddie Hebert, +Alexis Metaireau, +Daniel Velkov, +Sean Wang, +Inada Naoki, +Balthazar Rouberol, +Glen Baker, +Jan-Philip Gehrcke, +Matthijs van der Vleuten, +Simon Hayward, +Alexander James Phillips, +Ramiro Morales, +Philip Damra, +Francisco José Marques Vieira, +David Xia, +satoru, +James Summerfield, +Adam Szkoda, +Roy Smith, +Jianbin Wei, +Anton Larkin, +Matias Herranz, +Pietro Bertera, +Kevin Tewouda

    Also thanks to Denis Bilenko for writing gevent and guidance in constructing this tutorial.

    This is a collaborative document published under MIT license. @@ -244,11 +248,11 @@

    Synchronous & Asynchronous Execu print('Task %s done' % pid) def synchronous(): - for i in xrange(10): + for i in range(10): task(i) def asynchronous(): - threads = [gevent.spawn(task, i) for i in xrange(10)] + threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') @@ -273,15 +277,15 @@

    Synchronous & Asynchronous Execu Task 9 done Asynchronous: Task 1 done +Task 4 done +Task 8 done +Task 2 done Task 6 done Task 0 done Task 3 done -Task 4 done Task 5 done Task 7 done Task 9 done -Task 2 done -Task 8 done

    In the synchronous case all the tasks are run sequentially, which results in the main programming blocking ( @@ -308,12 +312,13 @@

    Synchronous & Asynchronous Execu
    import gevent.monkey
     gevent.monkey.patch_socket()
     
    +import json
    +from urllib.request import urlopen
    +
     import gevent
    -import urllib2
    -import simplejson as json
     
     def fetch(pid):
    -    response = urllib2.urlopen('http://jsontime.herokuapp.com/')
    +    response = urlopen('http://jsontime.herokuapp.com/')
         result = response.read()
         json_result = json.loads(result)
         datetime = json_result['datetime']
    @@ -347,34 +352,32 @@ 

    Determinism

     
     import time
    +from multiprocessing.pool import Pool
    +
    +from gevent.pool import Pool as GPool
     
     def echo(i):
         time.sleep(0.001)
         return i
     
    -# Non Deterministic Process Pool
    -
    -from multiprocessing.pool import Pool
    -
    -p = Pool(10)
    -run1 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run2 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run3 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run4 = [a for a in p.imap_unordered(echo, xrange(10))]
    -
    -print(run1 == run2 == run3 == run4)
    -
    -# Deterministic Gevent Pool
    -
    -from gevent.pool import Pool
    -
    -p = Pool(10)
    -run1 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run2 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run3 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run4 = [a for a in p.imap_unordered(echo, xrange(10))]
    -
    -print(run1 == run2 == run3 == run4)
    +if __name__ == '__main__':
    +    # Non Deterministic Process Pool
    +    with Pool(processes=10) as pool:
    +        run1 = [a for a in pool.imap_unordered(echo, range(10))]
    +        run2 = [a for a in pool.imap_unordered(echo, range(10))]
    +        run3 = [a for a in pool.imap_unordered(echo, range(10))]
    +        run4 = [a for a in pool.imap_unordered(echo, range(10))]
    +
    +    print(run1 == run2 == run3 == run4)
    +
    +    # Deterministic Gevent Pool
    +    p = GPool(10)
    +    run1 = [a for a in p.imap_unordered(echo, range(10))]
    +    run2 = [a for a in p.imap_unordered(echo, range(10))]
    +    run3 = [a for a in p.imap_unordered(echo, range(10))]
    +    run4 = [a for a in p.imap_unordered(echo, range(10))]
    +
    +    print(run1 == run2 == run3 == run4)
     
     
    @@ -532,10 +535,10 @@

    Greenlet State

    Program Shutdown

    Greenlets that fail to yield when the main program receives a -SIGQUIT may hold the program's execution longer than expected. +SIGINT may hold the program's execution longer than expected. This results in so called "zombie processes" which need to be killed from outside of the Python interpreter.

    -

    A common pattern is to listen SIGQUIT events on the main program +

    A common pattern is to listen SIGINT events on the main program and to invoke gevent.kill or gevent.killall before exit.

     import gevent
    @@ -546,7 +549,7 @@ 

    Program Shutdown

    if __name__ == '__main__': thread = gevent.spawn(run_forever) - gevent.signal(signal.SIGQUIT, gevent.kill, thread) + gevent.signal_handler(signal.SIGINT, gevent.kill, thread) thread.join()
    @@ -721,7 +724,8 @@

    Events

    gevent.spawn(waiter) ]) -if __name__ == '__main__': main() +if __name__ == '__main__': + main() @@ -780,7 +784,7 @@

    Queues

    print('Quitting time!') def boss(): - for i in xrange(1,25): + for i in range(1,25): tasks.put_nowait(i) gevent.spawn(boss).join() @@ -858,12 +862,12 @@

    Queues

    free since the maxsize of the task queue is 3. """ - for i in xrange(1,10): + for i in range(1,10): print('Assigned work %s in iteration 1' % (i)) tasks.put(i) print('Assigned all work in iteration 1') - for i in xrange(10,20): + for i in range(10,20): print('Assigned work %s in iteration 2' % (i)) tasks.put(i) print('Assigned all work in iteration 2') @@ -931,7 +935,7 @@

    Groups and Pools

    from gevent.pool import Group def talk(msg): - for i in xrange(3): + for i in range(3): print(msg) g1 = gevent.spawn(talk, 'bar') @@ -974,7 +978,7 @@

    Groups and Pools

    print('Size of group %s' % len(group)) print('Hello from Greenlet %s' % id(getcurrent())) -group.map(hello_from, xrange(3)) +group.map(hello_from, range(3)) def intensive(n): gevent.sleep(3 - n) @@ -983,13 +987,13 @@

    Groups and Pools

    print('Ordered') ogroup = Group() -for i in ogroup.imap(intensive, xrange(3)): +for i in ogroup.imap(intensive, range(3)): print(i) print('Unordered') igroup = Group() -for i in igroup.imap_unordered(intensive, xrange(3)): +for i in igroup.imap_unordered(intensive, range(3)): print(i) @@ -997,11 +1001,11 @@

    Groups and Pools

    
     Size of group 3
    -Hello from Greenlet 4405439216
    +Hello from Greenlet 71908208
     Size of group 3
    -Hello from Greenlet 4405439056
    +Hello from Greenlet 71908352
     Size of group 3
    -Hello from Greenlet 4405440336
    +Hello from Greenlet 71908496
     Ordered
     ('task', 0)
     ('task', 1)
    @@ -1024,7 +1028,7 @@ 

    Groups and Pools

    def hello_from(n): print('Size of pool %s' % len(pool)) -pool.map(hello_from, xrange(3)) +pool.map(hello_from, range(3))

    @@ -1090,8 +1094,8 @@

    Locks and Semaphores

    print('Worker %i released semaphore' % n) pool = Pool() -pool.map(worker1, xrange(0,2)) -pool.map(worker2, xrange(3,6)) +pool.map(worker1, range(0,2)) +pool.map(worker2, range(3,6))

    @@ -1186,7 +1190,7 @@

    Thread Locals

    WSGIServer(('', 8000), application).serve_forever() - +

    Flask's system is a bit more sophisticated than this example, but the @@ -1211,6 +1215,7 @@

    Subprocess

    out, err = sub.communicate() g.kill() print(out.rstrip()) +
    @@ -1221,7 +1226,7 @@ 

    Subprocess

    cron cron Linux - +

    Many people also want to use gevent and multiprocessing together. One of @@ -1244,17 +1249,17 @@

    Subprocess

    c, d = Pipe() def relay(): - for i in xrange(10): + for i in range(10): msg = b.recv() c.send(msg + " in " + str(i)) def put_msg(): - for i in xrange(10): + for i in range(10): wait_write(a.fileno()) a.send('hi') def get_msg(): - for i in xrange(10): + for i in range(10): wait_read(d.fileno()) print(d.recv()) @@ -1299,9 +1304,10 @@

    Actors

    one very simply using a Queue inside of a subclassed Greenlet.

     import gevent
    +from gevent import Greenlet
     from gevent.queue import Queue
     
    -class Actor(gevent.Greenlet):
    +class Actor(Greenlet):
     
         def __init__(self):
             self.inbox = Queue()
    @@ -1320,14 +1326,7 @@ 

    Actors

    message = self.inbox.get() self.receive(message) -
    -
    - -

    In a use case:

    -
    -import gevent
    -from gevent.queue import Queue
    -from gevent import Greenlet
    +# use case
     
     class Pinger(Actor):
         def receive(self, message):
    @@ -1348,7 +1347,7 @@ 

    Actors

    pong.start() ping.inbox.put('start') -gevent.joinall([ping, pong]) +gevent.joinall([ping, pong], timeout=2)
    @@ -1378,7 +1377,7 @@

    Gevent ZeroMQ

    server_socket.bind("tcp://127.0.0.1:5000") for request in range(1,10): - server_socket.send("Hello") + server_socket.send(b"Hello") print('Switched to Server for %s' % request) # Implicit context switch occurs here server_socket.recv() @@ -1392,7 +1391,7 @@

    Gevent ZeroMQ

    client_socket.recv() print('Switched to Client for %s' % request) # Implicit context switch occurs here - client_socket.send("World") + client_socket.send(b"World") publisher = gevent.spawn(server) client = gevent.spawn(client) @@ -1431,9 +1430,9 @@

    Simple Servers

    from gevent.server import StreamServer def handle(socket, address): - socket.send("Hello from a telnet!\n") + socket.send(b"Hello from a telnet!\n") for i in range(5): - socket.send(str(i) + '\n') + socket.send('{}\n'.format(i).encode()) socket.close() server = StreamServer(('127.0.0.1', 5000), handle) @@ -1517,11 +1516,9 @@

    Streaming Servers

    But regardless, performance on Gevent servers is phenomenal -compared to other Python servers. libev is a very vetted technology +compared to other Python servers. libev/libuv is a very vetted technology and its derivative servers are known to perform well at scale.

    -

    To benchmark, try Apache Benchmark ab or see this -Benchmark of Python WSGI Servers -for comparison with other servers.

    +

    To benchmark, try Apache Benchmark ab or locust for comparison with other servers.

     $ ab -n 10000 -c 100 http://127.0.0.1:8000/
     
    @@ -1530,9 +1527,8 @@ 

    Streaming Servers

    Long Polling

     import gevent
    -from gevent.queue import Queue, Empty
     from gevent.pywsgi import WSGIServer
    -import simplejson as json
    +from gevent.queue import Queue, Empty
     
     data_source = Queue()
     
    @@ -1552,7 +1548,7 @@ 

    Long Polling

    while True: try: datum = data_source.get(timeout=5) - yield json.dumps(datum) + '\n' + yield '{}\n'.format(json.dumps(datum)).encode() except Empty: pass @@ -1626,21 +1622,20 @@

    Websockets

    </html>

    Chat Server

    -

    The final motivating example, a realtime chat room. This example -requires Flask ( but not necessarily so, you could use Django, +

    In this example we will build a realtime chat room. It requires +requires Flask ( but not necessarily so, you could use Django, Pyramid, etc ). The corresponding Javascript and HTML files can be found here.

     # Micro gevent chatroom.
     # ----------------------
     
    -from flask import Flask, render_template, request
    +import json
     
    +from flask import Flask, render_template, request
     from gevent import queue
     from gevent.pywsgi import WSGIServer
     
    -import simplejson as json
    -
     app = Flask(__name__)
     app.debug = True
     
    @@ -1724,6 +1719,202 @@ 

    Chat Server

    http.serve_forever()
    + +

    HTTP2 Server

    +

    In this last example, we will create an HTTP/2 server, +HTTP/2 is the next generation of HTTP protocol, which will serve static files from a browser. +The files are served from a specific folder if specified if not, they will be serve from the +current folder where you run the server. To test this example, you will need python3.6+, +the excellent sans-io library h2. +To generate a private key localhost.key and a self-signed certificate localhost.crt +used in the following code you will need the OpenSSH library. If you don't know how to use it, +you can check this +tutorial

    +
    +import mimetypes
    +import sys
    +from functools import partial
    +from pathlib import Path
    +from typing import Tuple, Dict
    +
    +from gevent import socket, ssl
    +from gevent.event import Event
    +from gevent.server import StreamServer
    +from h2 import events
    +from h2.config import H2Configuration
    +from h2.connection import H2Connection
    +
    +def get_http2_tls_context() -> ssl.SSLContext:
    +    ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
    +    # RFC 7540 Section 9.2: Implementations of HTTP/2 MUST use TLS version 1.2
    +    # or higher. Disable TLS 1.1 and lower.
    +    ctx.options |= (
    +            ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
    +    )
    +    # RFC 7540 Section 9.2.1: A deployment of HTTP/2 over TLS 1.2 MUST disable
    +    # compression.
    +    ctx.options |= ssl.OP_NO_COMPRESSION
    +    ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
    +    ctx.load_cert_chain(certfile='localhost.crt', keyfile='localhost.key')
    +    ctx.set_alpn_protocols(['h2'])
    +    try:
    +        ctx.set_npn_protocols(['h2'])
    +    except NotImplementedError:
    +        pass
    +
    +    return ctx
    +
    +class H2Worker:
    +
    +    def __init__(self, sock: socket, address: Tuple[str, str], source_dir: str = None):
    +        self._sock = sock
    +        self._address = address
    +        self._flow_control_events: Dict[int, Event] = {}
    +        self._server_name = 'gevent-h2'
    +        self._connection: H2Connection = None
    +        # The maximum amount of a file we'll send in a single DATA frame
    +        self._read_chunk_size = 8192
    +
    +        self._check_sources_dir(source_dir)
    +        self._sources_dir = source_dir
    +
    +        self._run()
    +
    +    def _initiate_connection(self):
    +        h2_config = H2Configuration(client_side=False, header_encoding='utf-8')
    +        self._connection = H2Connection(h2_config)
    +        self._connection.initiate_connection()
    +        self._sock.sendall(self._connection.data_to_send())
    +
    +    @staticmethod
    +    def _check_sources_dir(sources_dir: str) -> None:
    +        p = Path(sources_dir)
    +        if not p.is_dir():
    +            raise NotADirectoryError(f'{sources_dir} does not exists')
    +
    +    def _send_error_response(self, status_code: str,
    +                             event: events.RequestReceived) -> None:
    +        self._connection.send_headers(
    +            stream_id=event.stream_id,
    +            headers=[
    +                (':status', status_code),
    +                ('content-length', '0'),
    +                ('server', self._server_name),
    +            ],
    +            end_stream=True
    +        )
    +        self._sock.sendall(self._connection.data_to_send())
    +
    +    def _handle_request(self, event: events.RequestReceived) -> None:
    +        headers = dict(event.headers)
    +        if headers[':method'] != 'GET':
    +            self._send_error_response('405', event)
    +            return
    +
    +        file_path = Path(self._sources_dir) / headers[':path'].lstrip('/')
    +        if not file_path.is_file():
    +            self._send_error_response('404', event)
    +            return
    +
    +        self._send_file(file_path, event.stream_id)
    +
    +    def _send_file(self, file_path: Path, stream_id: int) -> None:
    +        """
    +        Send a file, obeying the rules of HTTP/2 flow control.
    +        """
    +        file_size = file_path.stat().st_size
    +        content_type, content_encoding = mimetypes.guess_type(str(file_path))
    +        response_headers = [
    +            (':status', '200'),
    +            ('content-length', str(file_size)),
    +            ('server', self._server_name)
    +        ]
    +        if content_type:
    +            response_headers.append(('content-type', content_type))
    +        if content_encoding:
    +            response_headers.append(('content-encoding', content_encoding))
    +
    +        self._connection.send_headers(stream_id, response_headers)
    +        self._sock.sendall(self._connection.data_to_send())
    +
    +        with file_path.open(mode='rb', buffering=0) as f:
    +            self._send_file_data(f, stream_id)
    +
    +    def _send_file_data(self, file_obj, stream_id: int) -> None:
    +        """
    +        Send the data portion of a file. Handles flow control rules.
    +        """
    +        while True:
    +            while self._connection.local_flow_control_window(stream_id) < 1:
    +                self._wait_for_flow_control(stream_id)
    +
    +            chunk_size = min(
    +                self._connection.local_flow_control_window(stream_id), self._read_chunk_size
    +                )
    +            data = file_obj.read(chunk_size)
    +            keep_reading = (len(data) == chunk_size)
    +
    +            self._connection.send_data(stream_id, data, not keep_reading)
    +            self._sock.sendall(self._connection.data_to_send())
    +
    +            if not keep_reading:
    +                break
    +
    +    def _wait_for_flow_control(self, stream_id: int) -> None:
    +        """
    +        Blocks until the flow control window for a given stream is opened.
    +        """
    +        event = Event()
    +        self._flow_control_events[stream_id] = event
    +        event.wait()
    +
    +    def _handle_window_update(self, event: events.WindowUpdated) -> None:
    +        """
    +        Unblock streams waiting on flow control, if needed.
    +        """
    +        stream_id = event.stream_id
    +
    +        if stream_id and stream_id in self._flow_control_events:
    +            g_event = self._flow_control_events.pop(stream_id)
    +            g_event.set()
    +        elif not stream_id:
    +            # Need to keep a real list here to use only the events present at this time.
    +            blocked_streams = list(self._flow_control_events.keys())
    +            for stream_id in blocked_streams:
    +                g_event = self._flow_control_events.pop(stream_id)
    +                g_event.set()
    +
    +    def _run(self) -> None:
    +        self._initiate_connection()
    +
    +        while True:
    +            data = self._sock.recv(65535)
    +            if not data:
    +                break
    +
    +            h2_events = self._connection.receive_data(data)
    +            for event in h2_events:
    +                if isinstance(event, events.RequestReceived):
    +                    self._handle_request(event)
    +                elif isinstance(event, events.DataReceived):
    +                    self._connection.reset_stream(event.stream_id)
    +                elif isinstance(event, events.WindowUpdated):
    +                    self._handle_window_update(event)
    +
    +            data_to_send = self._connection.data_to_send()
    +            if data_to_send:
    +                self._sock.sendall(data_to_send)
    +
    +if __name__ == '__main__':
    +    files_dir = sys.argv[1] if len(sys.argv) > 1 else f'{Path().cwd()}'
    +    server = StreamServer(('127.0.0.1', 8080), partial(H2Worker, source_dir=files_dir),
    +                          ssl_context=get_http2_tls_context())
    +    try:
    +        server.serve_forever()
    +    except KeyboardInterrupt:
    +        server.close()
    +
    +
    diff --git a/requirements.txt b/requirements.txt index b17eeac..e11a640 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ Markdown==2.1.1 cogapp==2.3 -Jinja2==2.6 +Jinja2 gevent pyzmq diff --git a/tutorial.md b/tutorial.md index bcf78f9..0cba20c 100644 --- a/tutorial.md +++ b/tutorial.md @@ -12,37 +12,38 @@ applications today. ### Contributors In chronological order of contribution: -[Stephen Diehl](http://www.stephendiehl.com) -[Jérémy Bethmont](https://github.com/jerem) -[sww](https://github.com/sww) -[Bruno Bigras](https://github.com/brunoqc) -[David Ripton](https://github.com/dripton) -[Travis Cline](https://github.com/traviscline) -[Boris Feld](https://github.com/Lothiraldan) -[youngsterxyf](https://github.com/youngsterxyf) -[Eddie Hebert](https://github.com/ehebert) -[Alexis Metaireau](http://notmyidea.org) -[Daniel Velkov](https://github.com/djv) -[Sean Wang](https://github.com/sww) -[Inada Naoki](https://github.com/methane) -[Balthazar Rouberol](https://github.com/brouberol) -[Glen Baker](https://github.com/iepathos) -[Jan-Philip Gehrcke](https://gehrcke.de) -[Matthijs van der Vleuten](https://github.com/zr40) -[Simon Hayward](http://simonsblog.co.uk) -[Alexander James Phillips](https://github.com/AJamesPhillips) -[Ramiro Morales](https://github.com/ramiro) -[Philip Damra](https://github.com/djheru) -[Francisco José Marques Vieira](https://github.com/fvieira) -[David Xia](https://www.davidxia.com) -[satoru](https://github.com/satoru) -[James Summerfield](https://github.com/jsummerfield) -[Adam Szkoda](https://github.com/adaszko) -[Roy Smith](https://github.com/roysmith) -[Jianbin Wei](https://github.com/jianbin-netskope) -[Anton Larkin](https://github.com/ToxicWar) -[Matias Herranz](https://github.com/matiasherranz-santex) -[Pietro Bertera](http://www.bertera.it) +[Stephen Diehl](http://www.stephendiehl.com), +[Jérémy Bethmont](https://github.com/jerem), +[sww](https://github.com/sww), +[Bruno Bigras](https://github.com/brunoqc), +[David Ripton](https://github.com/dripton), +[Travis Cline](https://github.com/traviscline), +[Boris Feld](https://github.com/Lothiraldan), +[youngsterxyf](https://github.com/youngsterxyf), +[Eddie Hebert](https://github.com/ehebert), +[Alexis Metaireau](http://notmyidea.org), +[Daniel Velkov](https://github.com/djv), +[Sean Wang](https://github.com/sww), +[Inada Naoki](https://github.com/methane), +[Balthazar Rouberol](https://github.com/brouberol), +[Glen Baker](https://github.com/iepathos), +[Jan-Philip Gehrcke](https://gehrcke.de), +[Matthijs van der Vleuten](https://github.com/zr40), +[Simon Hayward](http://simonsblog.co.uk), +[Alexander James Phillips](https://github.com/AJamesPhillips), +[Ramiro Morales](https://github.com/ramiro), +[Philip Damra](https://github.com/djheru), +[Francisco José Marques Vieira](https://github.com/fvieira), +[David Xia](https://www.davidxia.com), +[satoru](https://github.com/satoru), +[James Summerfield](https://github.com/jsummerfield), +[Adam Szkoda](https://github.com/adaszko), +[Roy Smith](https://github.com/roysmith), +[Jianbin Wei](https://github.com/jianbin-netskope), +[Anton Larkin](https://github.com/ToxicWar), +[Matias Herranz](https://github.com/matiasherranz-santex), +[Pietro Bertera](http://www.bertera.it), +[Kevin Tewouda](https://github.com/lewoudar) Also thanks to Denis Bilenko for writing gevent and guidance in constructing this tutorial. @@ -58,7 +59,7 @@ This page is also [available in Japanese](http://methane.github.com/gevent-tutor ## Greenlets -The primary pattern used in gevent is the Greenlet, a +The primary pattern used in gevent is the **Greenlet**, a lightweight coroutine provided to Python as a C extension module. Greenlets all run inside of the OS process for the main program but are scheduled cooperatively. @@ -167,11 +168,11 @@ def task(pid): print('Task %s done' % pid) def synchronous(): - for i in xrange(10): + for i in range(10): task(i) def asynchronous(): - threads = [gevent.spawn(task, i) for i in xrange(10)] + threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') @@ -211,12 +212,13 @@ requests, depending on the load on the remote server at the time of the request.
    import gevent.monkey
     gevent.monkey.patch_socket()
     
    +import json
    +from urllib.request import urlopen
    +
     import gevent
    -import urllib2
    -import simplejson as json
     
     def fetch(pid):
    -    response = urllib2.urlopen('http://jsontime.herokuapp.com/')
    +    response = urlopen('http://jsontime.herokuapp.com/')
         result = response.read()
         json_result = json.loads(result)
         datetime = json_result['datetime']
    @@ -252,34 +254,34 @@ multiprocessing pool and compare its results to the one of a gevent pool.
     
     
     import time
    -
    -def echo(i):
    -    time.sleep(0.001)
    -    return i
    -
    -# Non Deterministic Process Pool
    -
     from multiprocessing.pool import Pool
     
    -p = Pool(10)
    -run1 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run2 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run3 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run4 = [a for a in p.imap_unordered(echo, xrange(10))]
    +from gevent.pool import Pool as GPool
     
    -print(run1 == run2 == run3 == run4)
     
    -# Deterministic Gevent Pool
    -
    -from gevent.pool import Pool
    +def echo(i):
    +    time.sleep(0.001)
    +    return i
     
    -p = Pool(10)
    -run1 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run2 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run3 = [a for a in p.imap_unordered(echo, xrange(10))]
    -run4 = [a for a in p.imap_unordered(echo, xrange(10))]
     
    -print(run1 == run2 == run3 == run4)
    +if __name__ == '__main__':
    +    # Non Deterministic Process Pool
    +    with Pool(processes=10) as pool:
    +        run1 = [a for a in pool.imap_unordered(echo, range(10))]
    +        run2 = [a for a in pool.imap_unordered(echo, range(10))]
    +        run3 = [a for a in pool.imap_unordered(echo, range(10))]
    +        run4 = [a for a in pool.imap_unordered(echo, range(10))]
    +
    +    print(run1 == run2 == run3 == run4)
    +
    +    # Deterministic Gevent Pool
    +    p = GPool(10)
    +    run1 = [a for a in p.imap_unordered(echo, range(10))]
    +    run2 = [a for a in p.imap_unordered(echo, range(10))]
    +    run3 = [a for a in p.imap_unordered(echo, range(10))]
    +    run4 = [a for a in p.imap_unordered(echo, range(10))]
    +
    +    print(run1 == run2 == run3 == run4)
     
     
    @@ -429,11 +431,11 @@ print(loser.exception) ## Program Shutdown Greenlets that fail to yield when the main program receives a -SIGQUIT may hold the program's execution longer than expected. +SIGINT may hold the program's execution longer than expected. This results in so called "zombie processes" which need to be killed from outside of the Python interpreter. -A common pattern is to listen SIGQUIT events on the main program +A common pattern is to listen SIGINT events on the main program and to invoke ``gevent.kill`` or ``gevent.killall`` before exit.
    @@ -445,7 +447,7 @@ def run_forever():
     
     if __name__ == '__main__':
         thread = gevent.spawn(run_forever)
    -    gevent.signal(signal.SIGQUIT, gevent.kill, thread)
    +    gevent.signal_handler(signal.SIGINT, gevent.kill, thread)
         thread.join()
     
     
    @@ -629,7 +631,8 @@ def main(): gevent.spawn(waiter) ]) -if __name__ == '__main__': main() +if __name__ == '__main__': + main()
    @@ -692,7 +695,7 @@ def worker(n): print('Quitting time!') def boss(): - for i in xrange(1,25): + for i in range(1,25): tasks.put_nowait(i) gevent.spawn(boss).join() @@ -744,12 +747,12 @@ def boss(): free since the maxsize of the task queue is 3. """ - for i in xrange(1,10): + for i in range(1,10): print('Assigned work %s in iteration 1' % (i)) tasks.put(i) print('Assigned all work in iteration 1') - for i in xrange(10,20): + for i in range(10,20): print('Assigned work %s in iteration 2' % (i)) tasks.put(i) print('Assigned all work in iteration 2') @@ -774,7 +777,7 @@ import gevent from gevent.pool import Group def talk(msg): - for i in xrange(3): + for i in range(3): print(msg) g1 = gevent.spawn(talk, 'bar') @@ -808,7 +811,7 @@ def hello_from(n): print('Size of group %s' % len(group)) print('Hello from Greenlet %s' % id(getcurrent())) -group.map(hello_from, xrange(3)) +group.map(hello_from, range(3)) def intensive(n): @@ -818,13 +821,13 @@ def intensive(n): print('Ordered') ogroup = Group() -for i in ogroup.imap(intensive, xrange(3)): +for i in ogroup.imap(intensive, range(3)): print(i) print('Unordered') igroup = Group() -for i in igroup.imap_unordered(intensive, xrange(3)): +for i in igroup.imap_unordered(intensive, range(3)): print(i) ]]] @@ -844,7 +847,7 @@ pool = Pool(2) def hello_from(n): print('Size of pool %s' % len(pool)) -pool.map(hello_from, xrange(3)) +pool.map(hello_from, range(3)) ]]] [[[end]]] @@ -908,8 +911,8 @@ def worker2(n): print('Worker %i released semaphore' % n) pool = Pool() -pool.map(worker1, xrange(0,2)) -pool.map(worker2, xrange(3,6)) +pool.map(worker1, range(0,2)) +pool.map(worker2, range(3,6)) ]]] [[[end]]] @@ -991,8 +994,7 @@ def application(environ, start_response): WSGIServer(('', 8000), application).serve_forever() - - +
    Flask's system is a bit more sophisticated than this example, but the @@ -1020,6 +1022,7 @@ sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip()) +
    @@ -1030,7 +1033,7 @@ cron
     cron
     cron
     Linux
    -
    +
     
    Many people also want to use ``gevent`` and ``multiprocessing`` together. One of @@ -1054,17 +1057,17 @@ a, b = Pipe() c, d = Pipe() def relay(): - for i in xrange(10): + for i in range(10): msg = b.recv() c.send(msg + " in " + str(i)) def put_msg(): - for i in xrange(10): + for i in range(10): wait_write(a.fileno()) a.send('hi') def get_msg(): - for i in xrange(10): + for i in range(10): wait_read(d.fileno()) print(d.recv()) @@ -1113,10 +1116,11 @@ one very simply using a Queue inside of a subclassed Greenlet.
     import gevent
    +from gevent import Greenlet
     from gevent.queue import Queue
     
     
    -class Actor(gevent.Greenlet):
    +class Actor(Greenlet):
     
         def __init__(self):
             self.inbox = Queue()
    @@ -1135,15 +1139,7 @@ class Actor(gevent.Greenlet):
                 message = self.inbox.get()
                 self.receive(message)
     
    -
    -
    - -In a use case: - -
    -import gevent
    -from gevent.queue import Queue
    -from gevent import Greenlet
    +# use case
     
     class Pinger(Actor):
         def receive(self, message):
    @@ -1151,12 +1147,14 @@ class Pinger(Actor):
             pong.inbox.put('ping')
             gevent.sleep(0)
     
    +
     class Ponger(Actor):
         def receive(self, message):
             print(message)
             ping.inbox.put('pong')
             gevent.sleep(0)
     
    +
     ping = Pinger()
     pong = Ponger()
     
    @@ -1164,7 +1162,7 @@ ping.start()
     pong.start()
     
     ping.inbox.put('start')
    -gevent.joinall([ping, pong])
    +gevent.joinall([ping, pong], timeout=2)
     
     
    @@ -1198,7 +1196,7 @@ def server(): server_socket.bind("tcp://127.0.0.1:5000") for request in range(1,10): - server_socket.send("Hello") + server_socket.send(b"Hello") print('Switched to Server for %s' % request) # Implicit context switch occurs here server_socket.recv() @@ -1212,7 +1210,7 @@ def client(): client_socket.recv() print('Switched to Client for %s' % request) # Implicit context switch occurs here - client_socket.send("World") + client_socket.send(b"World") publisher = gevent.spawn(server) client = gevent.spawn(client) @@ -1231,12 +1229,14 @@ gevent.joinall([publisher, client]) from gevent.server import StreamServer + def handle(socket, address): - socket.send("Hello from a telnet!\n") + socket.send(b"Hello from a telnet!\n") for i in range(5): - socket.send(str(i) + '\n') + socket.send('{}\n'.format(i).encode()) socket.close() + server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever() @@ -1327,12 +1327,10 @@ WSGIServer(('', 8000), application).serve_forever() But regardless, performance on Gevent servers is phenomenal -compared to other Python servers. libev is a very vetted technology +compared to other Python servers. libev/libuv is a very vetted technology and its derivative servers are known to perform well at scale. -To benchmark, try Apache Benchmark ``ab`` or see this -[Benchmark of Python WSGI Servers](http://nichol.as/benchmark-of-python-web-servers) -for comparison with other servers. +To benchmark, try Apache Benchmark ``ab`` or [locust](https://locust.io) for comparison with other servers.
     $ ab -n 10000 -c 100 http://127.0.0.1:8000/
    @@ -1343,17 +1341,18 @@ for comparison with other servers.
     
     
     import gevent
    -from gevent.queue import Queue, Empty
     from gevent.pywsgi import WSGIServer
    -import simplejson as json
    +from gevent.queue import Queue, Empty
     
     data_source = Queue()
     
    +
     def producer():
         while True:
             data_source.put_nowait('Hello World')
             gevent.sleep(1)
     
    +
     def ajax_endpoint(environ, start_response):
         status = '200 OK'
         headers = [
    @@ -1365,7 +1364,7 @@ def ajax_endpoint(environ, start_response):
         while True:
             try:
                 datum = data_source.get(timeout=5)
    -            yield json.dumps(datum) + '\n'
    +            yield '{}\n'.format(json.dumps(datum)).encode()
             except Empty:
                 pass
     
    @@ -1446,8 +1445,8 @@ HTML Page:
     
     ## Chat Server
     
    -The final motivating example, a realtime chat room. This example
    -requires Flask ( but not necessarily so, you could use Django,
    +In this example we will build a realtime chat room. It requires
    +requires Flask ( but not necessarily so, you could use Django,
     Pyramid, etc ). The corresponding Javascript and HTML files can
     be found here.
     
    @@ -1456,13 +1455,12 @@ be found here.
     # Micro gevent chatroom.
     # ----------------------
     
    -from flask import Flask, render_template, request
    +import json
     
    +from flask import Flask, render_template, request
     from gevent import queue
     from gevent.pywsgi import WSGIServer
     
    -import simplejson as json
    -
     app = Flask(__name__)
     app.debug = True
     
    @@ -1546,3 +1544,205 @@ if __name__ == "__main__":
         http.serve_forever()
     
     
    + + +## HTTP2 Server + +In this last example, we will create an [HTTP/2](https://tools.ietf.org/html/rfc7540) server, +HTTP/2 is the next generation of HTTP protocol, which will serve static files from a browser. +The files are served from a specific folder if specified if not, they will be serve from the +current folder where you run the server. To test this example, you will need **python3.6+**, +the excellent sans-io library **[h2](https://python-hyper.org/projects/h2/en/stable/)**. +To generate a private key `localhost.key` and a self-signed certificate `localhost.crt` +used in the following code you will need the OpenSSH library. If you don't know how to use it, +you can check this +**[tutorial](https://www.digitalocean.com/community/tutorials/openssl-essentials-working-with-ssl-certificates-private-keys-and-csrs)**. + +
    +import mimetypes
    +import sys
    +from functools import partial
    +from pathlib import Path
    +from typing import Tuple, Dict
    +
    +from gevent import socket, ssl
    +from gevent.event import Event
    +from gevent.server import StreamServer
    +from h2 import events
    +from h2.config import H2Configuration
    +from h2.connection import H2Connection
    +
    +
    +def get_http2_tls_context() -> ssl.SSLContext:
    +    ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
    +    # RFC 7540 Section 9.2: Implementations of HTTP/2 MUST use TLS version 1.2
    +    # or higher. Disable TLS 1.1 and lower.
    +    ctx.options |= (
    +            ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
    +    )
    +    # RFC 7540 Section 9.2.1: A deployment of HTTP/2 over TLS 1.2 MUST disable
    +    # compression.
    +    ctx.options |= ssl.OP_NO_COMPRESSION
    +    ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
    +    ctx.load_cert_chain(certfile='localhost.crt', keyfile='localhost.key')
    +    ctx.set_alpn_protocols(['h2'])
    +    try:
    +        ctx.set_npn_protocols(['h2'])
    +    except NotImplementedError:
    +        pass
    +
    +    return ctx
    +
    +
    +class H2Worker:
    +
    +    def __init__(self, sock: socket, address: Tuple[str, str], source_dir: str = None):
    +        self._sock = sock
    +        self._address = address
    +        self._flow_control_events: Dict[int, Event] = {}
    +        self._server_name = 'gevent-h2'
    +        self._connection: H2Connection = None
    +        # The maximum amount of a file we'll send in a single DATA frame
    +        self._read_chunk_size = 8192
    +
    +        self._check_sources_dir(source_dir)
    +        self._sources_dir = source_dir
    +
    +        self._run()
    +
    +    def _initiate_connection(self):
    +        h2_config = H2Configuration(client_side=False, header_encoding='utf-8')
    +        self._connection = H2Connection(h2_config)
    +        self._connection.initiate_connection()
    +        self._sock.sendall(self._connection.data_to_send())
    +
    +    @staticmethod
    +    def _check_sources_dir(sources_dir: str) -> None:
    +        p = Path(sources_dir)
    +        if not p.is_dir():
    +            raise NotADirectoryError(f'{sources_dir} does not exists')
    +
    +    def _send_error_response(self, status_code: str,
    +                             event: events.RequestReceived) -> None:
    +        self._connection.send_headers(
    +            stream_id=event.stream_id,
    +            headers=[
    +                (':status', status_code),
    +                ('content-length', '0'),
    +                ('server', self._server_name),
    +            ],
    +            end_stream=True
    +        )
    +        self._sock.sendall(self._connection.data_to_send())
    +
    +    def _handle_request(self, event: events.RequestReceived) -> None:
    +        headers = dict(event.headers)
    +        if headers[':method'] != 'GET':
    +            self._send_error_response('405', event)
    +            return
    +
    +        file_path = Path(self._sources_dir) / headers[':path'].lstrip('/')
    +        if not file_path.is_file():
    +            self._send_error_response('404', event)
    +            return
    +
    +        self._send_file(file_path, event.stream_id)
    +
    +    def _send_file(self, file_path: Path, stream_id: int) -> None:
    +        """
    +        Send a file, obeying the rules of HTTP/2 flow control.
    +        """
    +        file_size = file_path.stat().st_size
    +        content_type, content_encoding = mimetypes.guess_type(str(file_path))
    +        response_headers = [
    +            (':status', '200'),
    +            ('content-length', str(file_size)),
    +            ('server', self._server_name)
    +        ]
    +        if content_type:
    +            response_headers.append(('content-type', content_type))
    +        if content_encoding:
    +            response_headers.append(('content-encoding', content_encoding))
    +
    +        self._connection.send_headers(stream_id, response_headers)
    +        self._sock.sendall(self._connection.data_to_send())
    +
    +        with file_path.open(mode='rb', buffering=0) as f:
    +            self._send_file_data(f, stream_id)
    +
    +    def _send_file_data(self, file_obj, stream_id: int) -> None:
    +        """
    +        Send the data portion of a file. Handles flow control rules.
    +        """
    +        while True:
    +            while self._connection.local_flow_control_window(stream_id) < 1:
    +                self._wait_for_flow_control(stream_id)
    +
    +            chunk_size = min(
    +                self._connection.local_flow_control_window(stream_id), self._read_chunk_size
    +                )
    +            data = file_obj.read(chunk_size)
    +            keep_reading = (len(data) == chunk_size)
    +
    +            self._connection.send_data(stream_id, data, not keep_reading)
    +            self._sock.sendall(self._connection.data_to_send())
    +
    +            if not keep_reading:
    +                break
    +
    +    def _wait_for_flow_control(self, stream_id: int) -> None:
    +        """
    +        Blocks until the flow control window for a given stream is opened.
    +        """
    +        event = Event()
    +        self._flow_control_events[stream_id] = event
    +        event.wait()
    +
    +    def _handle_window_update(self, event: events.WindowUpdated) -> None:
    +        """
    +        Unblock streams waiting on flow control, if needed.
    +        """
    +        stream_id = event.stream_id
    +
    +        if stream_id and stream_id in self._flow_control_events:
    +            g_event = self._flow_control_events.pop(stream_id)
    +            g_event.set()
    +        elif not stream_id:
    +            # Need to keep a real list here to use only the events present at this time.
    +            blocked_streams = list(self._flow_control_events.keys())
    +            for stream_id in blocked_streams:
    +                g_event = self._flow_control_events.pop(stream_id)
    +                g_event.set()
    +
    +    def _run(self) -> None:
    +        self._initiate_connection()
    +
    +        while True:
    +            data = self._sock.recv(65535)
    +            if not data:
    +                break
    +
    +            h2_events = self._connection.receive_data(data)
    +            for event in h2_events:
    +                if isinstance(event, events.RequestReceived):
    +                    self._handle_request(event)
    +                elif isinstance(event, events.DataReceived):
    +                    self._connection.reset_stream(event.stream_id)
    +                elif isinstance(event, events.WindowUpdated):
    +                    self._handle_window_update(event)
    +
    +            data_to_send = self._connection.data_to_send()
    +            if data_to_send:
    +                self._sock.sendall(data_to_send)
    +
    +
    +if __name__ == '__main__':
    +    files_dir = sys.argv[1] if len(sys.argv) > 1 else f'{Path().cwd()}'
    +    server = StreamServer(('127.0.0.1', 8080), partial(H2Worker, source_dir=files_dir),
    +                          ssl_context=get_http2_tls_context())
    +    try:
    +        server.serve_forever()
    +    except KeyboardInterrupt:
    +        server.close()
    +
    +