diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 0000000..db24720 --- /dev/null +++ b/.codecov.yml @@ -0,0 +1 @@ +comment: off diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..8d8234d --- /dev/null +++ b/.editorconfig @@ -0,0 +1,16 @@ +[*] +insert_final_newline = true +trim_trailing_whitespace = true +end_of_line = lf +charset = utf-8 +indent_style = space + +[*.py] +indent_size = 4 +max_line_length = 88 + +[*.{json,yml,html}] +indent_size = 2 + +[Makefile] +indent_style = tab diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..a648ec3 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @daniel-ruiz @EmilioCarrion @mgarcia0094 @AdamjGoddard diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..f45a85c --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,11 @@ +### :tophat: What? + +Provide a description of what has been implemented. + +### :thinking: Why? + +Give an explanation of why. + +### :link: Related issue + +Add related issue's number. Example: Fix #1 diff --git a/.gitignore b/.gitignore index 4176808..5653318 100644 --- a/.gitignore +++ b/.gitignore @@ -10,9 +10,13 @@ /dist/ /*.egg-info/ /env/ +/venv/ MANIFEST coverage.* +docs/_build !.gitignore !.travis.yml -!.isort.cfg +!.editorconfig +!.github +!.codecov.yml diff --git a/.readthedocs.yml b/.readthedocs.yml new file mode 100644 index 0000000..2dd3bc2 --- /dev/null +++ b/.readthedocs.yml @@ -0,0 +1,21 @@ +# .readthedocs.yml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: docs/conf.py + +# Build documentation with MkDocs +#mkdocs: +# configuration: mkdocs.yml + +# Optionally build your docs in additional formats such as PDF and ePub +formats: all + +python: + install: + - requirements: requirements/base.txt diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..ec500db --- /dev/null +++ b/.travis.yml @@ -0,0 +1,28 @@ +dist: bionic +sudo: false +notifications: + email: false + +services: + - postgresql +addons: + postgresql: "9.6" + +language: python +python: + - "3.6" + - "3.10" + +install: + - make install-requirements + - make install-django-requirements + - make install-test-requirements + +before_script: + - make lint + - psql -c 'create database travis_ci_test;' -U postgres + +script: + - make coverage + - codecov + diff --git a/AUTHORS.md b/AUTHORS.md new file mode 100644 index 0000000..7002c54 --- /dev/null +++ b/AUTHORS.md @@ -0,0 +1,18 @@ +Thank you to all the contributors to Relé! + +* Andrew Graham-Yooll (@andrewgy8) +* Miquel Torres Barcelo (@tobami) +* Edgar Latorre (@edgarlatorre) +* Lluis Guirado (@LluisGuiVi) +* Daniel Ruiz (@dani-ruiz) +* Juanjo Ponz (@jjponz) +* Jose Antonio Navarro (@jonasae) +* Antonio Bustos Rodriguez (@antoniobusrod) +* David de la Iglesia (@ddelaiglesia) for the Logo! +* Santiago Lanús (@sanntt) +* Craig Mulligan (@hobochild) +* Daniel Demmel (@daaain) +* Luis Garcia Cuesta (@luisgc93) +* Chirag Jain (@chirgjin-les) +* Jordi Chulia (@jorchube) +* Emilio Carrión (@EmilioCarrion) diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index eb21dac..0000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,6 +0,0 @@ -Changelog -========= - -`0.2.0` (2019-05-09) - -* Initial version diff --git a/CHANGELOG.rst b/CHANGELOG.rst new file mode 100644 index 0000000..49926d4 --- /dev/null +++ b/CHANGELOG.rst @@ -0,0 +1,193 @@ +Changelog +========= + +1.6.0 (2022-08-03) +------------------- +* [Added] Implement auto restart of the consumption when futures are done or cancelled. (#226) + +1.5.0 (2022-04-20) +------------------- +* [Added] Add filter expressions to subscriptions. (#207) + +1.4.1 (2022-04-19) +------------------- +* [Modified] Fixed bug in the post-publish-failure VerboseLoggingMiddleware hook. (#220) + +1.4.0 (2022-04-13) +------------------- +* [Added] Added a VerboseLoggingMiddleware that does not truncate mesage payload. (#218) + +1.3.0 (2022-04-04) +------------------- +* GC Project Id & Windows support (#215) + +1.2.0 (2021-12-10) +------------------- +* [CHANGED] TimeotError from publisher (#212) +* Added filter_subs_by setting in documentation (#208) +* Automatic topic creation (#206) +* Log post publish success (#204) + +1.1.1 (2021-6-28) +------------------- +* Do not define default_app_config, it's deprecated (#199) +* Do not implement deprecated middlewares in the base class (#200) + +1.1.0 (2021-3-10) +------------------- +* Google Pubsub 2.0 Compat (#192) +* Add validations to the sub decorator (#189) +* Add new post_publish_hook and deprecate the old one (#190) +* Discover and load settings when publishing (#188) +* Fix #180: Raise error when the config loads a repeated subscription (#187) + +1.0.0 (2020-9-25) +------------------- +* BREAKING: Remove GC_PROJECT_ID (#183) + +0.14.0 (2020-8-5) +------------------- +* BREAKING: Remove GC_CREDENTIALS (#174) +* Add changelog to the docs site (#179) +* Catch TimeoutError and run post_publish_failure when blocking (#172) +* Deprecate GC_PROJECT_ID setting (#178) + +0.13.0 (2020-7-9) +------------------- +* Add documentation for class based subscriptions (#169) +* Deprecate GC_CREDENTIALS setting (#173) +* GC_CREDENTIALS_PATH setting option (#170) + +0.13.dev0 (2020-6-16) +--------------------- +* Traverse all packages to autodiscover all subs.py modules (#167) +* Auto-discovery of class based subscriptions (#168) + +0.12.0 (2020-6-12) +------------------- +* Added ``--settings`` path option in CLI (#166) +* Added isort linting (#164) + +0.11.0 (2020-6-4) +------------------- +* CLI feature (#160) +* Documentation Enhancements (#158, #155, #162) +* Testing Improvements (#154, #157) + +0.10.0 (2020-2-4) +------------------- +* Adjust default THREADS_PER_SUBSCRIPTION (#152) +* Add unrecoverable_middleware (#150) +* Allow multiple filters (#148) +* Configure timeout from .publish() (#143) +* Dont crash when subscription topic does not exist (#142) + +0.9.1 (2020-1-2) +------------------- +* Ack messages when data not json serializable (#141) +* Use ThreadScheduler instead of ThreadPoolExecutor (#145) + +0.9.0 (2019-12-20) +------------------- +* Flask support via middleware (#127) +* Add message attributes to metrics log (#128) +* Specify number of threads per subscriber with Subscription ThreadPoolExecutor (#139) +* Publishing timeout while blocking (#137) +* Clean up rele.config.setup + Worker() init (#132) + +0.8.1 (2019-11-25) +------------------- +* Fix runrele command + +0.8.0 (2019-11-22) +------------------- +* Worker run method (#118) +* Add kwargs to setup method passed through to middleware (#123) +* Add missing worker middleware hooks (#121) +* Add 3.8 support +* More Documentation + +0.7.0 (2019-10-21) +------------------- +* BREAKING: Remove Django as a dependency (#95) +* More documentation + +0.6.0 (2019-09-21) +------------------- +* BREAKING: Remove drf as a dependency (#91) +* Add message as a parameter for middleware hooks (#99) +* Check setting.CONN_MAX_AGE and warn when not 0 (#97) +* More documentation + +0.5.0 (2019-08-08) +------------------- +* ``python manage.py showsubscriptions`` command +* Configurable ENCODER setting +* Move DEFAULT_ACK_DEADLINE to the RELE config +* More documentation + +0.4.1 (2019-06-18) +------------------- +* Ability to install app only with rele +* Define default filter_by in settings.RELE + +0.4.0 (2019-06-17) +------------------- + +* Set ``DEFAULT_ACK_DEADLINE`` (#49) +* Filter by message attributes (#66) +* BREAKING: All Relé settings are defined in a dict (#60) + +Old structure: + +.. code:: python + + from google.oauth2 import service_account + RELE_GC_CREDENTIALS = service_account.Credentials.from_service_account_file( + 'rele/settings/dummy-credentials.json' + ) + RELE_GC_PROJECT_ID = 'dummy-project-id' + +New structure: + +.. code:: python + + from google.oauth2 import service_account + RELE = { + 'GC_CREDENTIALS': service_account.Credentials.from_service_account_file( + 'rele/settings/dummy-credentials.json' + ), + 'GC_PROJECT_ID': 'dummy-project-id', + 'MIDDLEWARE': [ + 'rele.contrib.LoggingMiddleware', + 'rele.contrib.DjangoDBMiddleware', + ], + 'SUB_PREFIX': 'mysubprefix', + 'APP_NAME': 'myappname', + } + +* ``rele.contrib.middleware`` (#55) +* Prefix argument in sub decorator (#47) +* Add timestamp to the published message (#42) +* BREAKING: Explicit publisher and subscriber configuration (#43) +* Sphinx documentation (#27, #34, #40, #41) +* Contributing guidelines (#32) + +0.3.1 (2019-06-04) +------------------- + +* Add prometheus metrics key to logs (#16 - #20, #22, #23) +* Fix JSON serialization when publishing (#25) + +0.3.0 (2019-05-14) +------------------- + +* Ability to run in emulator mode (#12) +* Add Travis-CI builds (#10) +* More friendly global publish (#11) +* Non-blocking behaviour when publishing by default (#6) + +0.2.0 (2019-05-09) +------------------- + +* Initial version diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..9a85b29 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,77 @@ +# Contributor Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in our project and +our community a harassment-free experience for everyone, regardless of age, body +size, disability, ethnicity, sex characteristics, gender identity and expression, +level of experience, education, socio-economic status, nationality, personal +appearance, race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies within all project spaces, and it also applies when +an individual is representing the project or its community in public spaces. +Examples of representing a project or community include using an official +project e-mail address, posting via an official social media account, or acting +as an appointed representative at an online or offline event. Representation of +a project may be further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team at mercadonaonline@mercadona.es. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an incident. +Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq + diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..e081eb0 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,38 @@ +# Contributing + +Thank you for contributing to Relé! + +To make the experience as easy as possible, we ask that you follow some simple +guidelines. Use your best judgment, and feel free to propose changes to this document +in a pull request. + +Additionally, be sure that you read and follow our +[Code of Conduct](https://github.com/mercadona/rele/blob/master/CODE_OF_CONDUCT.md). + +## Code + +We ask that you start a discussion before attempting to make a code contribution. For feature +requests, issues, bugs, etc. please +[create an issue via Github](https://github.com/mercadona/rele/issues/new) where we can all +discuss the contribution to the project. + +It is always best to have community input before proposing changes that may later be rejected. + +### Pull Requests + +* Make sure any code changes are covered by tests by running `make test`. +* Run `make lint` on any modified files. +* If your branch is behind master, +[rebase](https://github.com/edx/edx-platform/wiki/How-to-Rebase-a-Pull-Request) on top of it. +* Include the related issue's number so that Github closes _automagically_ +when the PR is merged. Example: `Fix #12` +* Add yourself to the [AUTHORS](./AUTHORS.md) file + +## Issues + +When you open an issue make sure you include the full stack trace and +that you list all pertinent information (operating system, Python implementation, etc.) +as part of the issue description. + +Please include a minimal, reproducible test case with every bug +report. diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..70b5386 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.8-buster + +SHELL ["/bin/bash", "-o", "pipefail", "-c"] +WORKDIR /rele +LABEL python_version=python + +COPY . . + +RUN make install-dev-requirements + +CMD ["make", "clean", "lint", "test"] diff --git a/LICENSE b/LICENSE index 261eeb9..9682a83 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2019-Present Mercadona S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/Makefile b/Makefile index 9e5b3a7..042b976 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,11 @@ -.PHONY: clean-pyc clean-build docs help +.PHONY: clean-pyc clean-build release docs help +.PHONY: lint test coverage test-codecov .DEFAULT_GOAL := help help: @grep '^[a-zA-Z]' $(MAKEFILE_LIST) | sort | awk -F ':.*?## ' 'NF==2 {printf "\033[36m %-25s\033[0m %s\n", $$1, $$2}' -clean: clean-build clean-pyc +clean: clean-build clean-pyc clean-tests clean-build: ## remove build artifacts rm -fr build/ @@ -16,25 +17,54 @@ clean-pyc: ## remove Python file artifacts find . -name '*.pyo' -exec rm -f {} + find . -name '*~' -exec rm -f {} + +clean-tests: ## remove pytest artifacts + rm -fr .pytest_cache/ + rm -fr htmlcov/ + lint: ## check style with flake8 - flake8 pubsub tests + black . --check --diff + isort --check-only test: ## run tests quickly with the default Python python runtests.py tests -test-all: ## run tests on every Python version with tox - tox - -coverage: ## check code coverage quickly with the default Python - coverage run --source django_pubsy runtests.py tests +coverage: ## generates codecov report + coverage run --source rele runtests.py tests coverage report -m - coverage html - open htmlcov/index.html -release: clean ## package and upload a release - python setup.py sdist upload - python setup.py bdist_wheel upload +release: clean install-deploy-requirements sdist ## package and upload a release + twine upload -u mercadonatech dist/* sdist: clean ## package python setup.py sdist ls -l dist + +install-requirements: ## install package requirements + pip install -r requirements/base.txt + +install-test-requirements: ## install requirements for testing + pip install -r requirements/test.txt + +install-deploy-requirements: ## install requirements for deployment + pip install -r requirements/deploy.txt + +install-docs-requirements: ## install requirements for documentation + pip install -r requirements/docs.txt + +install-django-requirements: ## install django requirements + pip install -r requirements/django.txt + +install-dev-requirements: install-requirements install-test-requirements install-docs-requirements install-django-requirements + +build-html-doc: ## builds the project documentation in HTML format + DJANGO_SETTINGS_MODULE=tests.settings make html -C docs + open docs/_build/html/index.html + +docker-build: + docker build -t rele . + +docker-test: docker-build + docker run -it --rm --name rele rele + +docker-shell: docker-build + docker run -it --rm --name rele --volume ${PWD}:/rele rele /bin/bash diff --git a/README.md b/README.md index 6d2d6d9..f0d6a49 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,92 @@ -# Relé +

+ +

+ +

+ + Relé makes integration with Google PubSub straightforward and easy. + +

+ +

+ + Build Status + + + Read the Docs + + + Code Coverage + + + PyPI - Python Version + + + PyPI - Downloads + +

-Relé makes integration with Google PubSub easier and is ready to integrate seamlessly into any Django project. ## Motivation and Features -The [Publish–subscribe pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) and specifically the Google Cloud [PubSub library](https://pypi.org/project/google-cloud-pubsub/) are very powerful tools but you can easily cut your fingers on it. Relé makes integration seamless by providing Publisher, Subscriber and Worker classes with the following features: +The [Publish-Subscribe pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) +and specifically the Google Cloud [Pub/Sub library](https://pypi.org/project/google-cloud-pubsub/) +are very powerful tools but you can easily cut your fingers on it. Relé makes integration +seamless by providing Publisher, Subscriber and Worker classes with the following features: -* A Publisher: - * Singleton: Ensures there is no memory leak when instantiating a `PublisherClient` every time you publish will result in a memory leak because the transport is not closed by the Google library. - * Automatic building of topic paths -* A `sub` decorator to declare subscribers: - * Super easy declaration of subscribers -* A Worker: - * In-built DB connection management so open connections don't increase over time -* A `python manage.py runrele` management command - * Automatic creation of Subscriptions - * Subscription auto-discovery +* Powerful Publishing API +* Highly Scalable Worker +* Intuitive Subscription Management +* Easily Extensible Middleware +* Ready to go Django/Flask integration +* CLI +* And much more! -## What's in the name +## What it looks like -"Relé" is Spanish for *relay*, a technology that [has played a key role](https://technicshistory.wordpress.com/2017/01/29/the-relay/) in history in the evolution of communication and electrical technology, including the telegraph, telephone, electricity transmision, and transistors. +```python +# Publish to the topic +import rele -## Quickstart +rele.publish(topic='photo-uploaded', data={'customer_id': 123}) -Add it to your `INSTALLED_APPS`: +# Subscribe to the Pub/Sub topic +from rele import sub -```python -INSTALLED_APPS = ( - 'rele', -) +@sub(topic='photo-uploaded') +def photo_uploaded(data, **kwargs): + print(f"Customer {data['customer_id']} has uploaded an image") ``` -You'll also need to set up two variables with the Google Cloud credentials: -`RELE_GC_CREDENTIALS` and `RELE_GC_PROJECT_ID`. - -NOTE: Ensure that [`CONN_MAX_AGE`](https://docs.djangoproject.com/en/2.2/ref/settings/#conn-max-age) -is set to 0 in your worker. The Django default value is 0. - -In other words, the environment where you run `python manage.py runrele`, -make sure `CONN_MAX_AGE` is not set explicitly. +## What's in the name -## Usage +"Relé" is Spanish for *relay*, a technology that +[has played a key role](https://technicshistory.wordpress.com/2017/01/29/the-relay/) in +history in the evolution of communication and electrical technology, including the telegraph, +telephone, electricity transmission, and transistors. -```python -# your_app.subs.py +## Install -from rele import Publisher, sub +Relé supports Python 3.6+ and installing via ``pip`` -def publish(): - client = Publisher() - client.publish(topic='lets-tell-everyone', - data={'foo': 'bar'}, - myevent='arrival') +`pip install rele` -@sub(topic='lets-tell-everyone') -def sub_function(data, **kwargs): - event = kwargs.get('myevent') - print(f'I am a task doing stuff with the newest event: {event}') -``` +or with Django integration -### Subscription `suffix` +`pip install rele[django]` -In order for multiple subscriptions to consume from the same topic, you'll want to add -a unique suffix to the subscriptions, so they both listen to all the gossip going around. +or with Flask integration -```python -@sub(topic='lets-tell-everyone', suffix='sub1') -def sub_1(data, **kwargs): - pass +`pip install rele[flask]` -@sub(topic='lets-tell-everyone', suffix='sub2') -def sub_2(data, **kwargs): - pass -``` +## Quickstart -### Running the worker in a process +[Please see our documentation to get started.](https://mercadonarele.readthedocs.io/en/latest/guides/basics.html) -In your worker, you can run `python manage.py runrele`. Once subscribed to -the topic, in another process you can run the `publish` function. Your subscription process -should print out the message. +You can also read more about it [here](https://medium.com/mercadona-tech/announcing-rel%C3%A9-c2d0540af3b9) ---- @@ -86,4 +94,4 @@ should print out the message. Does the code actually work? - pytest + make test diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/_static/rele_logo.png b/docs/_static/rele_logo.png new file mode 100644 index 0000000..ca426ad Binary files /dev/null and b/docs/_static/rele_logo.png differ diff --git a/docs/_static/style.css b/docs/_static/style.css new file mode 100644 index 0000000..b70d494 --- /dev/null +++ b/docs/_static/style.css @@ -0,0 +1,5 @@ +@import url('https://fonts.googleapis.com/css?family=Heebo&display=swap'); + +body { + font-family: 'Heebo'; +} diff --git a/docs/_templates/sidebar.html b/docs/_templates/sidebar.html new file mode 100644 index 0000000..b7488f1 --- /dev/null +++ b/docs/_templates/sidebar.html @@ -0,0 +1,10 @@ + +

+ +

diff --git a/docs/changelog.rst b/docs/changelog.rst new file mode 100644 index 0000000..565b052 --- /dev/null +++ b/docs/changelog.rst @@ -0,0 +1 @@ +.. include:: ../CHANGELOG.rst diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..c34eab0 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,81 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# http://www.sphinx-doc.org/en/master/config + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +import os +import sys + + +sys.path.insert(0, os.path.abspath("..")) + +import rele # noqa + +# -- Project information ----------------------------------------------------- + +project = "Relé" +copyright = "2019-2020, Mercadona S.A." +author = "Mercadona" +version = rele.__version__ + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = ["sphinx.ext.autodoc", "sphinx.ext.doctest"] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ["_templates"] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = "alabaster" +html_theme_options = { + "font_family": "Heebo", + "logo_name": False, + "code_font_family": ( + '"SFMono-Regular", Consolas, "Liberation Mono", Menlo, Courier,' " monospace" + ), + "code_font_size": "0.8em", + "show_related": False, + "fixed_sidebar": False, + "github_banner": False, + "github_button": True, + "github_type": "star", + "github_user": "Mercadona", + "github_repo": "rele", +} +master_doc = "index" + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ["_static"] + +html_sidebars = {"**": ["sidebar.html", "navigation.html", "searchbox.html"]} + + +# Setup function +def setup(app): + app.add_css_file("style.css") + + +# -- Extension configuration ------------------------------------------------- diff --git a/docs/guides/basics.rst b/docs/guides/basics.rst new file mode 100644 index 0000000..e0c322d --- /dev/null +++ b/docs/guides/basics.rst @@ -0,0 +1,138 @@ +.. _basics: + +First Steps +=========== + + +Configuration +_____________ + +In order to get started using Relé, we must have a PubSub topic in which to publish. +Via the `Google Cloud Console `_ +we create one, named ``photo-upload``. + +To authenticate our publisher and subscriber, follow the +`Google guide `_ on +how to obtain your authentication account. + +Publishing +__________ + +To configure Relé, our settings may look something like: + +.. code:: python + + # /settings.py + + RELE = { + 'GC_CREDENTIALS_PATH': 'credentials.json', + } + +.. code:: python + + # /publisher.py + + import rele + import settings # we need this for initializing the global Publisher singleton + + config = rele.config.setup(settings.RELE) + data = { + 'customer_id': 123, + 'location': '/google-bucket/photos/123.jpg' + } + + rele.publish(topic='photo-uploaded', data=data) + +To publish data, we simply pass in the topic to which we want our data to be published to, followed by +a valid json serializable Python object. + +.. note:: If you want to publish other types of objects, you may configure a custom :ref:`settings_encoder_path`. + +If you need to pass in additional attributes to the Message object, you can simply add ``kwargs``. +These must all be strings: + +.. code:: python + + rele.publish(topic='photo-uploaded', + data=data, + type='profile', + rotation='landscape') + +.. note:: Anything other than a string attribute will result in a ``TypeError``. + +.. _subscribing: + +Subscribing +___________ + +Once we can publish to a topic, we can subscribe to the topic from a worker instance. +In an app directory, we create our sub function within our ``subs.py`` file. + +.. code:: python + + # /app/subs.py + + from rele import sub + + @sub(topic='photo-uploaded') + def photo_uploaded(data, **kwargs): + print(f"Customer {data['customer_id']} has uploaded an image to our service, + and we stored it at {data['location'}.") + +Additionally, if you added message attributes to your Message, you can access them via the +``kwargs`` argument: + +.. code:: python + + @sub(topic='photo-uploaded') + def photo_uploaded(data, **kwargs): + print(f"Customer {data['customer_id']} has uploaded an image to our service, + and we stored it at {data['location'}. + It is a {kwargs['type']} picture with the + rotation {kwargs['rotation']}") + + +Message attributes +------------------ + +It might be helpful to access particular message attributes in your +subscriber. One attribute that _rele_ adds by default is ``published_at``. +To access this attribute you can use ``kwargs``. + +.. code:: python + + @sub(topic='photo-uploaded') + def photo_uploaded(data, **kwargs): + print(f"Customer {data['customer_id']} has uploaded an image to our service, + and it was published at {kwargs['published_at'}.") + + +.. _consuming: + +Consuming +_________ + +Once the sub is implemented, we can start our worker which will register the subscriber on the topic +with Google Cloud and will begin to pull the messages from the topic. + +.. code:: bash + + rele-cli run + + +In addition, if the ``settings.py`` module is not in the current directory, we can specify the +path. + +.. code:: bash + + rele-cli run --settings app.settings + + +.. note:: Autodiscovery of subscribers with ``rele-cli`` is automatic. + Any ``subs.py`` module you have in your current path, will be imported, and all subsequent decorated objects will be registered. + + | ├──settings.py + | ├──app # This can be called whatever you like + | ├────subs.py + +In another terminal session when we run ``python publisher.py``, we should see the print readout in our subscriber. diff --git a/docs/guides/django.rst b/docs/guides/django.rst new file mode 100644 index 0000000..80ab388 --- /dev/null +++ b/docs/guides/django.rst @@ -0,0 +1,58 @@ +.. _django_integration: + +Django Integration +================== + +.. note:: + This guide simply points out the differences between standalone Relé and + the Django integration. The basics about publishing and subscribing are described + in the :ref:`basics` section. + +Publishing +__________ + +To configure Relé, our settings may look something like: + +.. code:: python + + RELE = { + 'GC_CREDENTIALS_PATH': 'photo_project/settings/dummy-credentials.json', + 'MIDDLEWARE': [ + 'rele.contrib.LoggingMiddleware', + 'rele.contrib.DjangoDBMiddleware', + ], + 'APP_NAME': 'photo-imaging', + } + +The only major difference here is that we are using the ``rele.contrib.DjangoDBMiddleware``. +This is important to properly close DB connections. + +.. important:: + If you plan on having your subscriber connect to the database, it is vital that + the Django settings.CONN_MAX_AGE is set to 0. + + +Once the topic is created and our Django application has the proper configuration defined +in :ref:`settings`, we can start publishing to that topic. + + +Subscribing +___________ + +Since the Django integration comes with ``python manage.py runrele`` command, we must name the file +where we define our subscribers ``subs.py``. ``runrele`` will auto-discover all decorated +subscriber methods in a defined Django app and register/create the subscriptions for us. + +:ref:`subscribing` follows the same method as before. + + +Consuming +_________ + +Unlike what is described in :ref:`consuming`, the Django integration provides a very convenient +command. + +By running ``python manage.py runrele``, worker process will autodiscover any properly decorated ``@sub`` +function in the ``subs.py`` file and create the subscription for us. + +Once the process is up and running, we can publish and consume. diff --git a/docs/guides/emulator.rst b/docs/guides/emulator.rst new file mode 100644 index 0000000..7817d27 --- /dev/null +++ b/docs/guides/emulator.rst @@ -0,0 +1,54 @@ +Pub/Sub Emulator +================ + +It can be helpful to be able run the emulator in our development environment. +To be able to do that we can follow the steps below: + +1) Run the Google Cloud Pub/Sub emulator in the cloud-sdk container and map the port 8085. + +.. code:: bash + + $ docker pull google/cloud-sdk # Pull container + $ docker run -it --rm -p "8085:8085" google/cloud-sdk gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 + + +2) Export PUBSUB_EMULATOR_HOST environment variable to specify the emulator host. + + In case you don't want to set this variable, it will be necessary to have pub/sub credentials. + +.. code:: bash + + $ export PUBSUB_EMULATOR_HOST=localhost:8085 + + +3) Set rele settings in the Django project. + +.. code:: python + + # my_django_project/settings.py + + RELE = { + 'APP_NAME': 'my-awesome-app', + 'SUB_PREFIX': 'test', + 'GC_CREDENTIALS_PATH': 'my-credentials', + 'MIDDLEWARE': [ + 'rele.contrib.LoggingMiddleware', + 'rele.contrib.DjangoDBMiddleware', + ], + } + + +In case it's necessary to create a topic manually we can add it using the django shell. + +.. code:: bash + + python manage.py shell + +.. code:: python + + from django.conf import settings + from google.cloud import pubsub_v1 + + publisher_client = pubsub_v1.PublisherClient() + topic_path = publisher_client.topic_path(settings.RELE.get('GC_PROJECT_ID'), 'topic_name') + publisher_client.create_topic(topic_path) diff --git a/docs/guides/filters.rst b/docs/guides/filters.rst new file mode 100644 index 0000000..5ec91d0 --- /dev/null +++ b/docs/guides/filters.rst @@ -0,0 +1,72 @@ +Filtering Messages +================== + +Filter can be used to execute a subscription with specific parameters. +There are three types of filters, global, by passing a filter_by parameter in the +subscription (this applies the filter locally) or by passing a backend_filter_by +parameter in the subscription (this applies the filter on pubsub). + + +`filter_by` parameter +_____________________ + +This filter is a function that is supposed to return a boolean and this function +is passed as parameter ``filter_by`` in the subscription. + +.. code:: python + + def landscape_filter(kwargs): + return kwargs.get('type') == 'landscape' + + + # This subscription is going to be called if in the kwargs + # has a key type with value landscape + + @sub(topic='photo-updated', filter_by=landscape_filter) + def sub_process_landscape_photos(data, **kwargs): + print(f'Received a photo of type {kwargs.get("type")}') + + +`backend_filter_by` parameter +_____________________________ + +This filter is an expression that is applied to the subscription creation. This filter +expression is applied by pubsub before passing the message to the subscriber. More info +about filter expressions `here `_. + +.. note:: + Filter expressions are only applied on the subscription creation, they are not updated + if changed if you do not recreate the subscription on pubsub. + +.. code:: python + + # This subscription is going to be called if in the kwargs + # has a key type with value landscape + + @sub(topic='photo-updated', backend_filter_by='attributes:type = "landscape"') + def sub_process_landscape_photos(data, **kwargs): + print(f'Received a photo of type {kwargs.get("type")}') + + +Global Filter +_____________ + +This filter is specified in the settings with the key ``FILTER_SUBS_BY`` +that has a function as value. +In case a subscription has a filter already it's going to use it's own filter. + +.. code:: python + + import os + + def landscape_filter(kwargs): + return kwargs.get('type') == 'landscape' + + settings = { + ... + 'FILTER_SUBS_BY': landscape_filter, + } + + + + diff --git a/docs/guides/flask.rst b/docs/guides/flask.rst new file mode 100644 index 0000000..7239d94 --- /dev/null +++ b/docs/guides/flask.rst @@ -0,0 +1,51 @@ +.. _flask_integration: + +Flask Integration +================== + +.. note:: + This guide simply points out the differences between standalone Relé and + the Flask integration. The basics about publishing and consuming are described + in the :ref:`basics` section. + +Setup +__________ + +To configure Relé, our settings may look something like: + +.. code:: python + + RELE = { + 'GC_CREDENTIALS_PATH': 'photo_project/settings/dummy-credentials.json', + 'MIDDLEWARE': [ + 'rele.contrib.LoggingMiddleware', + 'rele.contrib.FlaskMiddleware', + ], + 'APP_NAME': 'photo-imaging', + } + + # Later when we setup rele and flask: + app = Flask() + rele.config.setup(RELE, flask_app=app) + +The only major difference here is that we are using the ``rele.contrib.FlaskMiddleware`` and +that we pass the Flask ``app`` instance to ``rele.config.setup`` method. + +Subscribing +____________ + +Now that that the middleware is setup our subscriptions will automatically have +`Flask's app context `_ pushed +when they are invoked so you will have access to the database connection pool and all +other app dependent utilities. + +.. code:: python + + from models import File + from database import db + + @sub(topic='photo-uploads') + def handle_upload(data, **kwargs): + new_file = File(data) + db.session.add(new_file) + db.session.commit() diff --git a/docs/guides/unrecoverable_middleware.rst b/docs/guides/unrecoverable_middleware.rst new file mode 100644 index 0000000..68b4895 --- /dev/null +++ b/docs/guides/unrecoverable_middleware.rst @@ -0,0 +1,39 @@ +.. _unrecoverable_middleware: + +Unrecoverable Middleware +======================== + +To acknowledge and ignore incompatible messages that your subscription is unable to handle, you can use the `UnrecoverableMiddleware`. + +Usage +__________ + +First make sure the middleware is included in your Relé config. + +.. code:: python + + # settings.py + import rele + from google.oauth2 import service_account + + RELE = { + 'GC_CREDENTIALSGC_CREDENTIALS_PATH': 'credentials.json', + 'MIDDLEWARE': ['rele.contrib.UnrecoverableMiddleWare'] + } + config = rele.config.setup(RELE) + +Then in your subscription handler if you encounter an incompatible message raise the `UnrecoverableException`. Your message will be `.acked()` and it will not be redelivered to your subscription. + +.. code:: python + + from rele.contrib.unrecoverable_middleware import UnrecoverableException + from rele import sub + + @sub(topic='photo-uploaded') + def photo_uploaded(data, **kwargs): + + if data.get("required_property") is None: + # Incompatible + raise UnrecoverableException("required_property is required.") + + # Handle correct messages diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..bc9b6b2 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,136 @@ +.. Rele documentation master file, created by + sphinx-quickstart on Wed Jun 5 14:19:08 2019. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to Relé's documentation! +================================ + +Release v\ |version|. (`Installation `_) + +.. image:: https://travis-ci.org/mercadona/rele.svg?branch=master + :target: https://travis-ci.org/mercadona/rele + +.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg + :target: https://github.com/mercadona/rele/blob/master/LICENSE + +------------------- + +**Relé** makes integration with Google PubSub easier and is ready to +integrate seamlessly into any Django project. + +The Publish-Subscribe pattern and specifically the Google Cloud Pub/Sub library are +very powerful tools but you can easily cut your fingers on it. Relé +makes integration seamless by providing Publisher, Subscriber and Worker +classes. + + +Features +________ + +Out of the box, Relé includes the following features: + + * Powerful Publishing API + * Highly Scalable Worker + * Intuitive Subscription Management + * Easily Extensible Middleware + * Ready to go Django/Flask integration + * CLI + * And much more... + +What It Looks Like +__________________ + +.. code:: python + + # Subscribe to the Pub/Sub topic + from rele import sub + @sub(topic='photo-uploaded') + def photo_uploaded(data, **kwargs): + print(f"Customer {data['customer_id']} has uploaded an image") + + # Publish to the topic + import rele + rele.publish(topic='photo-uploaded', data={'customer_id': 123}) + +Install +_______ + +Relé supports Python 3.6+ and installing via ``pip`` + +.. code:: + + $ pip install rele + +or with Django integration + +.. code:: + + $ pip install rele[django,flask] + +User Guides +___________ + +.. toctree:: + :maxdepth: 1 + + guides/basics + guides/django + guides/flask + guides/filters + guides/emulator + guides/unrecoverable_middleware + + +Configuration +_____________ + +Here you can see the full list of the settings options for your deployment of Relé. + +.. toctree:: + :maxdepth: 2 + + settings + + +API Docs +________ + +This is the part of documentation that details the inner workings of Relé. +If you are looking for information on a specific function, class or method, +this part of the documentation is for you. + + +.. toctree:: + :maxdepth: 2 + + reference + +Changelog +--------- + +Here you can see the full list of changes between each Relé release. + +.. toctree:: + :maxdepth: 1 + + changelog + + +Project Info +____________ + +.. toctree:: + :maxdepth: 1 + + Source Code + Contributing + Code of Conduct + License + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..2119f51 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/reference.rst b/docs/reference.rst new file mode 100644 index 0000000..ffcd7d9 --- /dev/null +++ b/docs/reference.rst @@ -0,0 +1,72 @@ +API Reference +============= + +.. module:: rele + +.. _ client + +Clients +------- + +.. autoclass:: rele.client.Publisher + :members: + + +.. _ publish + +.. autoclass:: rele.client.Subscriber + :members: + + +.. _ subscriber + +Publish +------- + +.. automodule:: rele.publishing + :members: + + +.. _ subscription + +Subscription +------------ + +.. automodule:: rele.subscription + :members: + + +.. _ worker + +Worker +------ + +.. automodule:: rele.worker + :members: + + +.. _ middleware + +Middleware +---------- + +Relé middleware's provide additional functionality to default behavior. Simply subclass +``BaseMiddleware`` and declare the hooks you wish to use. + +Base Middleware +--------------- + +.. autoclass:: rele.middleware.BaseMiddleware + :members: + +Logging Middleware +------------------ + +.. automodule:: rele.contrib.logging_middleware + :members: + +Django Middleware +----------------- + +.. automodule:: rele.contrib.django_db_middleware + :members: diff --git a/docs/settings.rst b/docs/settings.rst new file mode 100644 index 0000000..38f9047 --- /dev/null +++ b/docs/settings.rst @@ -0,0 +1,162 @@ +.. _settings: + +======== +Settings +======== + +.. contents:: + :local: + :depth: 1 + + +``RELE`` +-------- + +Default: ``{}`` (Empty dictionary) + +A dictionary mapping all Relé configuration settings to values defined +in your Django project's ``settings.py``. +Example:: + + RELE = { + 'GC_CREDENTIALS_PATH': 'rele/settings/dummy-credentials.json', + 'MIDDLEWARE': [ + 'rele.contrib.LoggingMiddleware', + 'rele.contrib.DjangoDBMiddleware', + ], + 'SUB_PREFIX': 'mysubprefix', + 'APP_NAME': 'myappname', + 'ENCODER_PATH': 'rest_framework.utils.encoders.JSONEncoder', + 'ACK_DEADLINE': 120, + 'PUBLISHER_TIMEOUT': 3.0, + 'FILTER_SUBS_BY': boolean_function + } + +``GC_PROJECT_ID`` +------------------ + +**Optional** + +GCP project id to use. If this is not provided then it is inferred via either +service account's project id or quota project id if using Application Default Credentials (ADC) + + +``GC_CREDENTIALS_PATH`` +----------------------- + +**Optional** + +Path to service account json file with access to PubSub + + +.. _settings_project_id: + +``MIDDLEWARE`` +------------------ + +**Optional** + +Default: ``['rele.contrib.LoggingMiddleware']`` + +List of the middleware modules that will be included in the project. The order +of execution follows FIFO. + +It is strongly recommended that for Django integration, you add:: + + [ + 'rele.contrib.LoggingMiddleware', + 'rele.contrib.DjangoDBMiddleware', + ] + +The DjangoDBMiddleware will take care of opening and closing connections to the db before +and after your callbacks are executed. If this is left out, it is highly probable that +your database will run out of connections in your connection pool. + +The LoggingMiddleware will take care of logging subscription information before and after the callback is executed. +The subscription message is only logged when an exception was raised while processing it. +If you would like to log this message in every case, you should create a middleware of your own. + + +``SUB_PREFIX`` +------------------ + +**Optional** + +A prefix to all your subs that can be declared globally. + +For instance, if you have two projects listening to one topic, you may want to add a +prefix so that there can be two distinct subscribers to that one topic. + + +``APP_NAME`` +------------------ + +**Optional** + +The application name. + +This should be unique to all the services running in the application ecosystem. It is used by +the LoggingMiddleware and Prometheus integration. + +.. _settings_encoder_path: + +``ENCODER_PATH`` +------------------ + +**Optional** + +Default: `rest_framework.utils.encoders.JSONEncoder `_ + +`Encoder class path `_ to use for +serializing your Python data structure to a json object when publishing. + +.. note:: The default encoder class is subject to change in an upcoming release. + It is advised that you use this setting explicitly. + +``ACK_DEADLINE`` +------------------ + +**Optional** + +Ack deadline for all subscribers in seconds. + +.. seealso:: The `Google Pub/Sub documentation `_ + which states that *The subscriber has a configurable, limited amount of time -- + known as the ackDeadline -- to acknowledge the outstanding message. Once the deadline + passes, the message is no longer considered outstanding, and Cloud Pub/Sub will attempt + to redeliver the message.* + +.. _settings_publisher_timeout: + +``PUBLISHER_TIMEOUT`` +--------------------- + +**Optional** + +Default: 3.0 seconds + +Timeout that the publishing result will wait on the future to publish successfully while blocking. + +`See Google PubSub documentation for more info +`_ + +``THREADS_PER_SUBSCRIPTION`` +---------------------------- + +**Optional** + +Default: 2 + +Number of threads that will be consumed for each subscription. +Default behavior of the Google Cloud PubSub library is to use 10 threads per subscription. +We thought this was too much for a default setting and have taken the liberty of +reducing the thread count to 2. If you would like to maintain the default Google PubSub +library behavior, please set this value to 10. + +``FILTER_SUBS_BY`` +---------------------------- + +**Optional** + +Boolean function that applies a global filter on all subscriptions. +For more information, please see `Filtering Messages section `. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e238d43 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.black] +line-length = 88 +target_version = ['py36'] +include = '\.pyi?$' diff --git a/rele/__init__.py b/rele/__init__.py index ed3d40d..e9bbdbe 100644 --- a/rele/__init__.py +++ b/rele/__init__.py @@ -1,5 +1,15 @@ -__version__ = '0.2.0' +__version__ = "1.6.0" + +try: + import django +except ImportError: + pass +else: + if django.__version__ < "3.2": + default_app_config = "rele.apps.ReleConfig" from .client import Publisher, Subscriber # noqa +from .config import setup # noqa +from .publishing import publish # noqa from .subscription import Callback, Subscription, sub # noqa from .worker import Worker # noqa diff --git a/rele/__main__.py b/rele/__main__.py new file mode 100644 index 0000000..743879f --- /dev/null +++ b/rele/__main__.py @@ -0,0 +1,45 @@ +import argparse +import logging +import os +import sys + +from rele import config, discover, subscription +from rele.worker import create_and_run + +logger = logging.getLogger(__name__) + + +def main(): + # modify path so we can import modules and packages + sys.path.insert(0, os.getcwd()) + + parser = argparse.ArgumentParser( + prog="Relé", description="Harness the power of Relé from the command line" + ) + + subparsers = parser.add_subparsers(help="Select a command", dest="command") + + run_parser = subparsers.add_parser( + "run", + help="Run a Relé worker with auto-discovery of subs modules in the " + "current path. Auto-discovery will include all subs " + "and settings modules. If no settings module is discovered, " + "defaults will be used.", + ) + run_parser.add_argument( + "--settings", + "-s", + default=None, + required=False, + help="Settings file dot path. Ex. project.settings. " + "If none is supplied, Relé will attempt to autodiscover in the root path.", + ) + args = parser.parse_args() + + if args.command == "run": + settings, module_paths = discover.sub_modules(args.settings) + configuration = config.setup(settings.RELE if settings else None) + subs = config.load_subscriptions_from_paths( + module_paths, configuration.sub_prefix, configuration.filter_by + ) + create_and_run(subs, configuration) diff --git a/rele/apps.py b/rele/apps.py index 4cb8367..eaac827 100644 --- a/rele/apps.py +++ b/rele/apps.py @@ -1,5 +1,11 @@ from django.apps import AppConfig +from django.conf import settings + +import rele.config class ReleConfig(AppConfig): - name = 'rele' + name = "rele" + + def ready(self): + rele.config.setup(settings.RELE) diff --git a/rele/client.py b/rele/client.py index bd698f3..0b5fa92 100644 --- a/rele/client.py +++ b/rele/client.py @@ -1,64 +1,190 @@ +import json import logging +import os +import time +from concurrent.futures import TimeoutError from contextlib import suppress -from django.conf import settings +import google.auth from google.api_core import exceptions from google.cloud import pubsub_v1 -from rest_framework.renderers import JSONRenderer + +from rele.middleware import run_middleware_hook logger = logging.getLogger(__name__) +USE_EMULATOR = True if os.environ.get("PUBSUB_EMULATOR_HOST") else False +DEFAULT_ENCODER_PATH = "json.JSONEncoder" +DEFAULT_ACK_DEADLINE = 60 + + +def get_google_defaults(): + try: + credentials, project = google.auth.default() + return credentials, project + except google.auth.exceptions.DefaultCredentialsError: + return None, None + class Subscriber: + """The Subscriber Class. + + For convenience, this class wraps the creation and consumption of a topic + subscription. + + :param gc_project_id: str :ref:`settings_project_id` . + :param credentials: obj :meth:`~rele.config.Config.credentials`. + :param default_ack_deadline: int Ack Deadline defined in settings + """ + + def __init__(self, gc_project_id, credentials, default_ack_deadline=None): + self._gc_project_id = gc_project_id + self._ack_deadline = default_ack_deadline or DEFAULT_ACK_DEADLINE + self.credentials = credentials if not USE_EMULATOR else None + self._client = pubsub_v1.SubscriberClient(credentials=credentials) - def __init__(self): - self._client = pubsub_v1.SubscriberClient( - credentials=settings.RELE_GC_CREDENTIALS) + def create_subscription(self, subscription): + """Handles creating the subscription when it does not exists. - def create_subscription(self, subscription, topic): + This makes it easier to deploy a worker and forget about the + subscription side of things. The subscription must + have a topic to subscribe to. Which means that the topic must be + created manually before the worker is started. + + :param subscription: str Subscription name + :param topic: str Topic name to subscribe + """ subscription_path = self._client.subscription_path( - settings.GC_PROJECT_ID, subscription) - topic_path = self._client.topic_path( - settings.RELE_GC_PROJECT_ID, topic) + self._gc_project_id, subscription.name + ) + topic_path = self._client.topic_path(self._gc_project_id, subscription.topic) with suppress(exceptions.AlreadyExists): - self._client.create_subscription( - name=subscription_path, topic=topic_path) + try: + self._create_subscription(subscription_path, topic_path, subscription) + except exceptions.NotFound: + logger.warning( + "Cannot subscribe to a topic that does not exist." + f"Creating {topic_path}..." + ) + topic = self._create_topic(topic_path) + logger.info(f"Topic {topic.name} created.") + self._create_subscription(subscription_path, topic_path, subscription) + + def _create_topic(self, topic_path): + publisher_client = pubsub_v1.PublisherClient(credentials=self.credentials) + return publisher_client.create_topic(request={"name": topic_path}) + + def _create_subscription(self, subscription_path, topic_path, subscription): + request = { + "name": subscription_path, + "topic": topic_path, + "ack_deadline_seconds": self._ack_deadline, + } + + if subscription.backend_filter_by: + request["filter"] = subscription.backend_filter_by - def subscribe(self, subscription_name, callback): + self._client.create_subscription(request=request) + + def consume(self, subscription_name, callback, scheduler): + """Begin listening to topic from the SubscriberClient. + + :param subscription_name: str Subscription name + :param callback: Function which act on a topic message + :param scheduler: `Thread pool-based scheduler. `_ # noqa + :return: `Future `_ # noqa + """ subscription_path = self._client.subscription_path( - settings.GC_PROJECT_ID, subscription_name) - return self._client.subscribe(subscription_path, callback=callback) + self._gc_project_id, subscription_name + ) + return self._client.subscribe( + subscription_path, callback=callback, scheduler=scheduler + ) + +class Publisher: + """The Publisher Class -class _PublisherSingleton(type): - _instance = None + Wraps the Google Cloud Publisher Client and handles encoding of the data. - def __call__(cls, *args, **kwargs): - if not cls._instance: - cls._instance = super(_PublisherSingleton, cls).__call__( - *args, **kwargs) - return cls._instance + It is important that this class remains a Singleton class in the process. + Otherwise, a memory leak will occur. To avoid this, it is strongly + recommended to use the :meth:`~rele.publishing.publish` method. + If the setting `USE_EMULATOR` evaluates to True, the Publisher Client will + not have any credentials assigned. -class Publisher(metaclass=_PublisherSingleton): - PUBLISH_TIMEOUT = 2.0 + :param gc_project_id: string Google Cloud Project ID. + :param credentials: string Google Cloud Credentials. + :param encoder: A valid `json.encoder.JSONEncoder subclass `_ # noqa + :param timeout: float, default :ref:`settings_publisher_timeout` + """ - def __init__(self): - self._client = pubsub_v1.PublisherClient( - credentials=settings.RELE_GC_CREDENTIALS) + def __init__(self, gc_project_id, credentials, encoder, timeout): + self._gc_project_id = gc_project_id + self._timeout = timeout + self._encoder = encoder + if USE_EMULATOR: + self._client = pubsub_v1.PublisherClient() + else: + self._client = pubsub_v1.PublisherClient(credentials=credentials) + + def publish( + self, topic, data, blocking=False, timeout=None, raise_exception=True, **attrs + ): + """Publishes message to Google PubSub topic. + + Usage:: + + publisher = Publisher() + publisher.publish('topic_name', {'foo': 'bar'}) + + By default, this method is non-blocking, meaning that the method does + not wait for the future to be returned. + + If you would like to wait for the future so you can track the message + later, you can: + + Usage:: + + publisher = Publisher() + future = publisher.publish('topic_name', {'foo': 'bar'}, blocking=True, timeout=10.0) # noqa + + However, it should be noted that using `blocking=True` may incur a + significant performance hit. + + In addition, the method adds a timestamp `published_at` to the + message attrs using `epoch floating point number + `_. + + :param topic: string topic to publish the data. + :param data: dict with the content of the message. + :param blocking: boolean + :param timeout: float, default None falls back to :ref:`settings_publisher_timeout` + :param raise_exception: boolean. If True, exceptions coming from PubSub will be raised + :param attrs: additional string parameters to be published. + :return: `Future `_ # noqa + """ + + attrs["published_at"] = str(time.time()) + run_middleware_hook("pre_publish", topic, data, attrs) + payload = json.dumps(data, cls=self._encoder).encode("utf-8") + topic_path = self._client.topic_path(self._gc_project_id, topic) + future = self._client.publish(topic_path, payload, **attrs) + if not blocking: + return future - def publish(self, topic, data, **attrs): - data = JSONRenderer().render(data) - logger.info(f'Publishing to {topic}', - extra={'pubsub_publisher_data': data}) - topic_path = self._client.topic_path( - settings.RELE_GC_PROJECT_ID, topic) - future = self._client.publish(topic_path, data, **attrs) try: - future.result(timeout=self.PUBLISH_TIMEOUT) - except Exception: - logger.error('Exception while publishing to %s', topic, - exc_info=True, extra={'pubsub_data': data}) - return False - return True + future.result(timeout=timeout or self._timeout) + except TimeoutError as e: + run_middleware_hook("post_publish_failure", topic, e, data) + if raise_exception: + raise e + else: + run_middleware_hook("post_publish_success", topic, data, attrs) + + # DEPRECATED + run_middleware_hook("post_publish", topic) + + return future diff --git a/rele/config.py b/rele/config.py new file mode 100644 index 0000000..4ad4648 --- /dev/null +++ b/rele/config.py @@ -0,0 +1,119 @@ +import importlib +import os +import warnings + +from google.oauth2 import service_account + +from .client import DEFAULT_ACK_DEADLINE, DEFAULT_ENCODER_PATH, get_google_defaults +from .middleware import default_middleware, register_middleware +from .publishing import init_global_publisher +from .subscription import Subscription + + +class Config: + """Configuration class. + + The settings.RELE dictionary will be parsed into an easily + accessible object containing all the necessary constants. + + If no middleware is set, the *default_middleware* will be added. + + :param setting: dict :ref:`settings <_settings>` + """ + + def __init__(self, setting): + self._gc_project_id = setting.get("GC_PROJECT_ID") + self.gc_credentials_path = setting.get("GC_CREDENTIALS_PATH") + self.app_name = setting.get("APP_NAME") + self.sub_prefix = setting.get("SUB_PREFIX") + self.middleware = setting.get("MIDDLEWARE", default_middleware) + self.ack_deadline = setting.get( + "ACK_DEADLINE", + os.environ.get("DEFAULT_ACK_DEADLINE", DEFAULT_ACK_DEADLINE), + ) + self._encoder_path = setting.get("ENCODER_PATH", DEFAULT_ENCODER_PATH) + self.publisher_timeout = setting.get("PUBLISHER_TIMEOUT", 3.0) + self.threads_per_subscription = setting.get("THREADS_PER_SUBSCRIPTION", 2) + self.filter_by = setting.get("FILTER_SUBS_BY") + self._credentials = None + + @property + def encoder(self): + module_name, class_name = self._encoder_path.rsplit(".", 1) + module = importlib.import_module(module_name) + return getattr(module, class_name) + + @property + def credentials(self): + if self.gc_credentials_path: + self._credentials = service_account.Credentials.from_service_account_file( + self.gc_credentials_path + ) + else: + credentials, project_id = get_google_defaults() + self._credentials = credentials + if not self._gc_project_id: + self._gc_project_id = project_id + return self._credentials + + @property + def gc_project_id(self): + if self._gc_project_id: + return self._gc_project_id + elif self.credentials: + return self.credentials.project_id + else: + return None + + +def setup(setting=None, **kwargs): + if setting is None: + setting = {} + + config = Config(setting) + init_global_publisher(config) + register_middleware(config, **kwargs) + return config + + +def subscription_from_attribute(attribute): + try: + if isinstance(attribute, Subscription): + subscription = attribute + elif issubclass(attribute, Subscription): + subscription = attribute() + else: + return None + except TypeError: + # If attribute is not a class, TypeError is raised when testing issubclass + return None + return subscription + + +def load_subscriptions_from_paths(sub_module_paths, sub_prefix=None, filter_by=None): + + subscriptions = {} + for sub_module_path in sub_module_paths: + sub_module = importlib.import_module(sub_module_path) + for attr_name in dir(sub_module): + attribute = getattr(sub_module, attr_name) + + subscription = subscription_from_attribute(attribute) + if not subscription: + continue + if sub_prefix and not subscription.prefix: + subscription.set_prefix(sub_prefix) + + if filter_by and not subscription.filter_by: + subscription.set_filters(filter_by) + + if subscription.name in subscriptions: + found_subscription = subscriptions[subscription.name] + raise RuntimeError( + f"Duplicate subscription name found: {subscription.name}. Subs " + f"{subscription._func.__module__}.{subscription._func.__name__} and " + f"{found_subscription._func.__module__}.{found_subscription._func.__name__} collide." + ) + + subscriptions[subscription.name] = subscription + return list(subscriptions.values()) diff --git a/rele/contrib/__init__.py b/rele/contrib/__init__.py new file mode 100644 index 0000000..b47f00f --- /dev/null +++ b/rele/contrib/__init__.py @@ -0,0 +1,9 @@ +from .flask_middleware import FlaskMiddleware # noqa +from .logging_middleware import LoggingMiddleware # noqa +from .unrecoverable_middleware import UnrecoverableMiddleWare # noqa +from .verbose_logging_middleware import VerboseLoggingMiddleware # noqa + +try: + from .django_db_middleware import DjangoDBMiddleware # noqa +except ImportError: + pass diff --git a/rele/contrib/django_db_middleware.py b/rele/contrib/django_db_middleware.py new file mode 100644 index 0000000..6a36bab --- /dev/null +++ b/rele/contrib/django_db_middleware.py @@ -0,0 +1,16 @@ +from django import db + +from rele.middleware import BaseMiddleware + + +class DjangoDBMiddleware(BaseMiddleware): + """Django specific middleware for managing database connections.""" + + def pre_process_message(self, *args): + db.close_old_connections() + + def post_process_message(self): + db.close_old_connections() + + def post_worker_stop(self): + db.connections.close_all() diff --git a/rele/contrib/flask_middleware.py b/rele/contrib/flask_middleware.py new file mode 100644 index 0000000..b5ae141 --- /dev/null +++ b/rele/contrib/flask_middleware.py @@ -0,0 +1,13 @@ +from rele.middleware import BaseMiddleware + + +class FlaskMiddleware(BaseMiddleware): + def setup(self, config, **kwargs): + self.app = kwargs["flask_app"] + + def pre_process_message(self, subscription, message): + self.ctx = self.app.app_context() + self.ctx.push() + + def post_process_message(self): + self.ctx.pop() diff --git a/rele/contrib/logging_middleware.py b/rele/contrib/logging_middleware.py new file mode 100644 index 0000000..a2c881a --- /dev/null +++ b/rele/contrib/logging_middleware.py @@ -0,0 +1,121 @@ +import json +import logging +import time + +from rele.middleware import BaseMiddleware + + +class LoggingMiddleware(BaseMiddleware): + """Default logging middleware. + + Logging format has been configured for Prometheus. + """ + + def __init__(self): + self._logger = None + + def setup(self, config, **kwargs): + self._logger = logging.getLogger(__name__) + self._app_name = config.app_name + + def _build_data_metrics( + self, subscription, message, status, start_processing_time=None + ): + result = { + "agent": self._app_name, + "topic": subscription.topic, + "status": status, + "subscription": subscription.name, + "attributes": dict(message.attributes), + } + + if start_processing_time is not None: + end_processing_time = time.time() + result["duration_seconds"] = round( + end_processing_time - start_processing_time, 3 + ) + + return result + + def pre_publish(self, topic, data, attrs): + self._logger.debug( + f"Publishing to {topic}", + extra={ + "pubsub_publisher_attrs": attrs, + "metrics": { + "name": "publications", + "data": {"agent": self._app_name, "topic": topic}, + }, + }, + ) + + def post_publish_success(self, topic, data, attrs): + self._logger.info( + f"Successfully published to {topic}", + extra={ + "pubsub_publisher_attrs": attrs, + "metrics": { + "name": "publications", + "data": {"agent": self._app_name, "topic": topic}, + }, + }, + ) + + def post_publish_failure(self, topic, exception, message): + self._logger.exception( + f"Exception raised while publishing message " + f"for {topic}: {str(exception.__class__.__name__)}", + exc_info=True, + extra={ + "metrics": { + "name": "publications", + "data": {"agent": self._app_name, "topic": topic}, + }, + "subscription_message": json.dumps(message), + }, + ) + + def pre_process_message(self, subscription, message): + self._logger.debug( + f"Start processing message for {subscription}", + extra={ + "metrics": { + "name": "subscriptions", + "data": self._build_data_metrics(subscription, message, "received"), + } + }, + ) + + def post_process_message_success(self, subscription, start_time, message): + self._logger.info( + f"Successfully processed message for {subscription}", + extra={ + "metrics": { + "name": "subscriptions", + "data": self._build_data_metrics( + subscription, message, "succeeded", start_time + ), + } + }, + ) + + def post_process_message_failure( + self, subscription, exception, start_time, message + ): + self._logger.error( + f"Exception raised while processing message " + f"for {subscription}: {str(exception.__class__.__name__)}", + exc_info=True, + extra={ + "metrics": { + "name": "subscriptions", + "data": self._build_data_metrics( + subscription, message, "failed", start_time + ), + }, + "subscription_message": str(message), + }, + ) + + def pre_worker_stop(self, subscriptions): + self._logger.info(f"Cleaning up {len(subscriptions)} subscription(s)...") diff --git a/rele/contrib/unrecoverable_middleware.py b/rele/contrib/unrecoverable_middleware.py new file mode 100644 index 0000000..3f39126 --- /dev/null +++ b/rele/contrib/unrecoverable_middleware.py @@ -0,0 +1,11 @@ +from rele.middleware import BaseMiddleware + + +class UnrecoverableException(Exception): + pass + + +class UnrecoverableMiddleWare(BaseMiddleware): + def post_process_message_failure(self, subscription, err, start_time, message): + if isinstance(err, UnrecoverableException): + message.ack() diff --git a/rele/contrib/verbose_logging_middleware.py b/rele/contrib/verbose_logging_middleware.py new file mode 100644 index 0000000..e45f522 --- /dev/null +++ b/rele/contrib/verbose_logging_middleware.py @@ -0,0 +1,46 @@ +import json + +from rele.contrib.logging_middleware import LoggingMiddleware + + +class VerboseLoggingMiddleware(LoggingMiddleware): + def post_process_message_failure( + self, subscription, exception, start_time, message + ): + super().post_process_message_failure( + subscription, exception, start_time, _VerboseMessage(message) + ) + + +class _VerboseMessage: + def __init__(self, message): + self._message = message + self.attributes = message.attributes + + def __repr__(self): + _MESSAGE_REPR = """\ +Message {{ + data: {!r} + ordering_key: {!r} + attributes: {} +}}""" + + data = self._message._message.data + attrs = self._message_attrs_repr() + ordering_key = str(self._message.ordering_key) + + return _MESSAGE_REPR.format(data, ordering_key, attrs) + + def _message_attrs_repr(self): + message_attrs = json.dumps( + dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True + ) + + indented = [] + for line in message_attrs.split("\n"): + indented.append(" " + line) + + message_attrs = "\n".join(indented) + message_attrs = message_attrs.lstrip() + + return message_attrs diff --git a/rele/discover.py b/rele/discover.py new file mode 100644 index 0000000..6dd6919 --- /dev/null +++ b/rele/discover.py @@ -0,0 +1,53 @@ +import importlib +import logging +import pkgutil +from importlib.util import find_spec as importlib_find + +logger = logging.getLogger(__name__) + + +def module_has_submodule(package, module_name): + """ + See if 'module' is in 'package'. + Taken from https://github.com/django/django/blob/master/django/utils/module_loading.py#L63 + """ + imported_package = importlib.import_module(package) + package_name = imported_package.__name__ + package_path = imported_package.__path__ + full_module_name = package_name + "." + module_name + try: + return importlib_find(full_module_name, package_path) is not None + except (ModuleNotFoundError, AttributeError): + # When module_name is an invalid dotted path, Python raises + # ModuleNotFoundError. + return False + + +def _import_settings_from_path(path): + if path is not None: + print(" * Discovered settings: %r" % path) + return importlib.import_module(path) + + +def sub_modules(settings_path=None): + """ + In the current PYTHONPATH, we can traverse all modules and determine if they + have a settings.py or directory with a subs.py module. If either one of + those exists, we import it, and return the settings module, and + paths to the subs file. + + If a settings module is not found, we return None. + + :return: (settings module, List[string: subs module paths]) + """ + module_paths = [] + for f, package, is_package in pkgutil.walk_packages(path=["."]): + if package == "settings": + settings_path = package + if is_package and module_has_submodule(package, "subs"): + module = package + ".subs" + module_paths.append(module) + print(" * Discovered subs module: %r" % module) + + settings = _import_settings_from_path(settings_path) + return settings, module_paths diff --git a/rele/management/commands/runmaruja.py b/rele/management/commands/runmaruja.py deleted file mode 100644 index 7eb2c4f..0000000 --- a/rele/management/commands/runmaruja.py +++ /dev/null @@ -1,58 +0,0 @@ -import importlib -import logging -import signal -import time - -from django.apps import apps -from django.core.management import BaseCommand -from django.utils.module_loading import module_has_submodule - -from rele import Subscription, Worker - -logger = logging.getLogger(__name__) - - -class Command(BaseCommand): - help = 'Start subscriber threads to consume rele topics.' - - def handle(self, *args, **options): - subs = self._autodiscover_subs() - self.stdout.write(f'Configuring worker with {len(subs)} ' - f'subscription(s)...') - for sub in subs: - self.stdout.write(f' {sub}') - worker = Worker(subs) - worker.setup() - worker.start() - - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, worker.stop) - signal.signal(signal.SIGTSTP, worker.stop) - - self._wait_forever() - - def _discover_subs_modules(self): - logger.debug('Autodiscovering subs...') - app_configs = apps.get_app_configs() - subs_modules = [] - for conf in app_configs: - if module_has_submodule(conf.module, "subs"): - module = conf.name + ".subs" - subs_modules.append(module) - self.stdout.write(" * Discovered subs module: %r" % module) - return subs_modules - - def _autodiscover_subs(self): - subscriptions = [] - for sub_module_path in self._discover_subs_modules(): - sub_module = importlib.import_module(sub_module_path) - for attr_name in dir(sub_module): - attribute = getattr(sub_module, attr_name) - if isinstance(attribute, Subscription): - subscriptions.append(attribute) - return subscriptions - - def _wait_forever(self): - self.stdout.write('Consuming subscriptions...') - while True: - time.sleep(1) diff --git a/rele/management/commands/runrele.py b/rele/management/commands/runrele.py new file mode 100644 index 0000000..7795a45 --- /dev/null +++ b/rele/management/commands/runrele.py @@ -0,0 +1,30 @@ +import logging + +from django.conf import settings +from django.core.management import BaseCommand + +from rele import config +from rele.management.discover import discover_subs_modules +from rele.worker import create_and_run + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Start subscriber threads to consume messages from Relé topics." + config = config.Config(settings.RELE) + + def handle(self, *args, **options): + if all(map(lambda x: x.get("CONN_MAX_AGE"), settings.DATABASES.values())): + self.stderr.write( + self.style.WARNING( + "WARNING: settings.CONN_MAX_AGE is not set to 0. " + "This may result in slots for database connections to " + "be exhausted." + ) + ) + subs = config.load_subscriptions_from_paths( + discover_subs_modules(), self.config.sub_prefix, self.config.filter_by + ) + self.stdout.write(f"Configuring worker with {len(subs)} " f"subscription(s)...") + create_and_run(subs, self.config) diff --git a/rele/management/commands/showsubscriptions.py b/rele/management/commands/showsubscriptions.py new file mode 100644 index 0000000..0a5dcfc --- /dev/null +++ b/rele/management/commands/showsubscriptions.py @@ -0,0 +1,21 @@ +from django.core.management import BaseCommand +from tabulate import tabulate + +from rele.config import load_subscriptions_from_paths +from rele.management.discover import discover_subs_modules + + +class Command(BaseCommand): + help = "List information about Pub/Sub subscriptions registered using Relé." + + def handle(self, *args, **options): + headers = ["Topic", "Subscriber(s)", "Sub"] + + subscription_paths = discover_subs_modules() + subs = sorted( + load_subscriptions_from_paths(subscription_paths), + key=lambda sub: sub.topic, + ) + sub_data = [(sub.topic, sub.name, sub._func.__name__) for sub in subs] + + self.stdout.write(tabulate(sub_data, headers=headers)) diff --git a/rele/management/discover.py b/rele/management/discover.py new file mode 100644 index 0000000..71f4dd9 --- /dev/null +++ b/rele/management/discover.py @@ -0,0 +1,18 @@ +import logging + +from django.apps import apps +from django.utils.module_loading import module_has_submodule + +logger = logging.getLogger(__name__) + + +def discover_subs_modules(): + logger.debug("Autodiscovering subs...") + app_configs = apps.get_app_configs() + subs_modules = [] + for conf in app_configs: + if module_has_submodule(conf.module, "subs"): + module = conf.name + ".subs" + subs_modules.append(module) + logger.debug(" * Discovered subs module: %r" % module) + return subs_modules diff --git a/rele/middleware.py b/rele/middleware.py new file mode 100644 index 0000000..20085cf --- /dev/null +++ b/rele/middleware.py @@ -0,0 +1,111 @@ +import importlib +import warnings + +_middlewares = [] + +default_middleware = ["rele.contrib.LoggingMiddleware"] +DEPRECATED_HOOKS = ["post_publish"] + + +def register_middleware(config, **kwargs): + paths = config.middleware + global _middlewares + _middlewares = [] + for path in paths: + *module_parts, middleware_class = path.split(".") + module_path = ".".join(module_parts) + module = importlib.import_module(module_path) + middleware = getattr(module, middleware_class)() + middleware.setup(config, **kwargs) + _middlewares.append(middleware) + + +def run_middleware_hook(hook_name, *args, **kwargs): + for middleware in _middlewares: + if hook_name not in DEPRECATED_HOOKS or hasattr(middleware, hook_name): + getattr(middleware, hook_name)(*args, **kwargs) + + +class WarnDeprecatedHooks(type): + def __new__(cls, *args, **kwargs): + x = super().__new__(cls, *args, **kwargs) + for deprecated_hook in DEPRECATED_HOOKS: + if hasattr(x, deprecated_hook): + warnings.warn( + "The post_publish hook in the middleware is deprecated " + "and will be removed in future versions. Please substitute it with " + "the post_publish_success hook instead.", + DeprecationWarning, + ) + return x + + +class BaseMiddleware(metaclass=WarnDeprecatedHooks): + """Base class for middleware. The default implementations + for all hooks are no-ops and subclasses may implement whatever + subset of hooks they like. + """ + + def setup(self, config, **kwargs): + """Called when middleware is registered. + :param config: Relé Config object + """ + + def pre_publish(self, topic, data, attrs): + """Called before Publisher sends message. + :param topic: + :param data: + :param attrs: + """ + + def post_publish_success(self, topic, data, attrs): + """Called after Publisher succesfully sends message. + :param topic: + :param data: + :param attrs: + """ + + def post_publish_failure(self, topic, exception, message): + """Called after publishing fails. + :param topic: + :param exception: + :param message: + """ + + def pre_process_message(self, subscription, message): + """Called when the Worker receives a message. + :param subscription: + :param message: + """ + + def post_process_message(self): + """Called after the Worker processes the message.""" + + def post_process_message_success(self, subscription, start_time, message): + """Called after the message has been successfully processed. + :param subscription: + :param start_time: + :param message: + """ + + def post_process_message_failure( + self, subscription, exception, start_time, message + ): + """Called after the message has been unsuccessfully processed. + :param subscription: + :param exception: + :param start_time: + :param message: + """ + + def pre_worker_start(self): + """Called before the Worker process starts up.""" + + def post_worker_start(self): + """Called after the Worker process starts up.""" + + def pre_worker_stop(self, subscriptions): + """Called before the Worker process shuts down.""" + + def post_worker_stop(self): + """Called after the Worker process shuts down.""" diff --git a/rele/publishing.py b/rele/publishing.py new file mode 100644 index 0000000..3368548 --- /dev/null +++ b/rele/publishing.py @@ -0,0 +1,52 @@ +from rele import config, discover + +from .client import Publisher + +_publisher = None + + +def init_global_publisher(config): + global _publisher + if not _publisher: + _publisher = Publisher( + gc_project_id=config.gc_project_id, + credentials=config.credentials, + encoder=config.encoder, + timeout=config.publisher_timeout, + ) + return _publisher + + +def publish(topic, data, **kwargs): + """Shortcut method to publishing data to PubSub. + + This is a shortcut method that instantiates the Publisher if not already + instantiated in the process. This is to ensure that the Publisher remains a + Singleton class. + + Usage:: + + import rele + + def myfunc(): + # ... + rele.publish(topic='lets-tell-everyone', + data={'foo': 'bar'}, + myevent='arrival') + + :param topic: str PubSub topic name + :param data: dict-like Data to be sent as the message. + :param timeout: float. Default None, falls back to RELE['PUBLISHER_TIMEOUT'] value + :param blocking: boolean. Default False + :param kwargs: Any optional key-value pairs that are included as attributes + in the message + :return: None + """ + if not _publisher: + settings, _ = discover.sub_modules() + if not hasattr(settings, "RELE"): + raise ValueError("Config setup not called and settings module not found.") + + config.setup(settings.RELE) + + _publisher.publish(topic, data, **kwargs) diff --git a/rele/subscription.py b/rele/subscription.py index 3a9bc9e..078ae78 100644 --- a/rele/subscription.py +++ b/rele/subscription.py @@ -1,57 +1,219 @@ import json import logging +import time +from collections.abc import Iterable +from inspect import getfullargspec, getmodule -from django import db -from django.conf import settings +from .middleware import run_middleware_hook logger = logging.getLogger(__name__) class Subscription: + """The Subscription class - def __init__(self, func, topic, suffix=None): + In addition to using the ``@sub`` decorator, it is possible to subclass + the Subscription. + + For example:: + + from rele import Subscription + + class DoSomethingSub(Subscription): + topic = 'photo-uploaded' + + def __init__(self): + self._func = self.callback_func + super().__init__(self._func, self.topic) + + def callback_func(self, data, **kwargs): + print(data["id"]) + + If ``rele-cli run`` is used, the ``DoSomethingSub`` will be a valid subscription + and registered on Google Cloud. + + """ + + def __init__( + self, func, topic, prefix="", suffix="", filter_by=None, backend_filter_by=None + ): self._func = func self.topic = topic - self.name = f'{settings.BASE_DIR.split("/")[-1]}-{topic}' - if suffix: - self.name += f'-{suffix}' + self._prefix = prefix + self._suffix = suffix + self._filters = self._init_filters(filter_by) + self.backend_filter_by = backend_filter_by + + def _init_filters(self, filter_by): + if filter_by and not ( + callable(filter_by) + or ( + isinstance(filter_by, Iterable) + and all(callable(filter) for filter in filter_by) + ) + ): + raise ValueError("Filter_by must be a callable or a list of callables.") + + if isinstance(filter_by, Iterable): + return filter_by + elif filter_by: + return [filter_by] + + return None + + @property + def name(self): + name_parts = [self._prefix, self.topic, self._suffix] + return "-".join(filter(lambda x: x, name_parts)) + + @property + def prefix(self): + return self._prefix + + def set_prefix(self, prefix): + self._prefix = prefix + + @property + def filter_by(self): + return self._filters + + def set_filters(self, filter_by): + self._filters = filter_by def __call__(self, data, **kwargs): - self._func(data, **kwargs) + if "published_at" in kwargs: + kwargs["published_at"] = float(kwargs["published_at"]) + + if self._any_filter_returns_false(kwargs): + return + + return self._func(data, **kwargs) def __str__(self): - return f'{self.name} - {self._func.__name__}' + return f"{self.name} - {self._func.__name__}" + def _any_filter_returns_false(self, kwargs): + if not self._filters: + return False + + return not all(filter(kwargs) for filter in self._filters) -class Callback: +class Callback: def __init__(self, subscription, suffix=None): self._subscription = subscription self._suffix = suffix def __call__(self, message): - db.close_old_connections() + run_middleware_hook("pre_process_message", self._subscription, message) + start_time = time.time() - logger.info(f'Start processing message for {self._subscription}') - data = json.loads(message.data.decode('utf-8')) try: - self._subscription(data, **dict(message.attributes)) + data = json.loads(message.data.decode("utf-8")) + except json.JSONDecodeError as e: + message.ack() + run_middleware_hook( + "post_process_message_failure", + self._subscription, + e, + start_time, + message, + ) + run_middleware_hook("post_process_message") + return + + try: + res = self._subscription(data, **dict(message.attributes)) except Exception as e: - logger.error(f'Exception raised while processing message ' - f'for {self._subscription}: ' - f'{str(e.__class__.__name__)}', - exc_info=True) + run_middleware_hook( + "post_process_message_failure", + self._subscription, + e, + start_time, + message, + ) else: message.ack() - logger.info(f'Successfully processed message for ' - f'{self._subscription}') + run_middleware_hook( + "post_process_message_success", + self._subscription, + start_time, + message, + ) + return res finally: - db.close_old_connections() + run_middleware_hook("post_process_message") + + +def sub(topic, prefix=None, suffix=None, filter_by=None, backend_filter_by=None): + """Decorator function that makes declaring a PubSub Subscription simple. + + The Subscriber returned will automatically create and name + the subscription for the topic. + The subscription name will be the topic name prefixed by the project name. + + For example, if the topic name to subscribe too is `lets-tell-everyone`, + the subscriber will be named `project-name-lets-tell-everyone`. + + Additionally, if a `suffix` param is added, the subscriber will be + `project-name-lets-tell-everyone-my-suffix`. + + It is recommended to add `**kwargs` to your `sub` function. This will allow + message attributes to be sent without breaking the subscriber + implementation. + + Usage:: + + @sub(topic='lets-tell-to-alice', prefix='shop') + def bob_purpose(data, **kwargs): + pass + + @sub(topic='lets-tell-everyone', suffix='sub1') + def purpose_1(data, **kwargs): + pass + + @sub(topic='lets-tell-everyone', suffix='sub2') + def purpose_2(data, **kwargs): + pass + @sub(topic='photo-updated', + filter_by=lambda **attrs: attrs.get('type') == 'landscape') + def sub_process_landscape_photos(data, **kwargs): + pass -def sub(topic, suffix=None): + :param topic: string The topic that is being subscribed to. + :param prefix: string An optional prefix to the subscription name. + Useful to namespace your subscription with your project name + :param suffix: string An optional suffix to the subscription name. + Useful when you have two subscribers in the same project + that are subscribed to the same topic. + :param filter_by: Union[function, list] An optional function or tuple of + functions that filters the messages to be processed by + the sub regarding their attributes. + :return: :class:`~rele.subscription.Subscription` + """ def decorator(func): - return Subscription(func=func, topic=topic, suffix=suffix) + args_spec = getfullargspec(func) + if len(args_spec.args) != 1 or not args_spec.varkw: + raise RuntimeError( + f"Subscription function {func.__module__}.{func.__name__} is not valid. " + "The function must have one argument and accept keyword arguments." + ) + + if getmodule(func).__name__.split(".")[-1] != "subs": + logger.warning( + f"Subscription function {func.__module__}.{func.__name__} is " + "outside a subs module that will not be discovered." + ) + + return Subscription( + func=func, + topic=topic, + prefix=prefix, + suffix=suffix, + filter_by=filter_by, + backend_filter_by=backend_filter_by, + ) return decorator diff --git a/rele/worker.py b/rele/worker.py index 316b2c8..639dae5 100644 --- a/rele/worker.py +++ b/rele/worker.py @@ -1,37 +1,162 @@ import logging +import signal import sys +import time +from concurrent import futures -from django import db +from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler from .client import Subscriber +from .middleware import run_middleware_hook from .subscription import Callback logger = logging.getLogger(__name__) class Worker: + """A Worker manages the subscriptions which consume Google PubSub messages. - def __init__(self, subscriptions): - self._subscriber = Subscriber() - self._futures = [] + Facilitates the creation of subscriptions if not already created, + and the starting and stopping the consumption of them. + + :param subscriptions: list :class:`~rele.subscription.Subscription` + """ + + def __init__( + self, + subscriptions, + gc_project_id=None, + credentials=None, + default_ack_deadline=None, + threads_per_subscription=None, + ): + self._subscriber = Subscriber(gc_project_id, credentials, default_ack_deadline) + self._futures = {} self._subscriptions = subscriptions + self.threads_per_subscription = threads_per_subscription def setup(self): + """Create the subscriptions on a Google PubSub topic. + + If the subscription already exists, the subscription will not be + re-created. Therefore, it is idempotent. + """ for subscription in self._subscriptions: - self._subscriber.create_subscription(subscription.name, - subscription.topic) + self._subscriber.create_subscription(subscription) def start(self): + """Begin consuming all subscriptions. + + When consuming a subscription, a ``StreamingPullFuture`` is returned from + the Google PubSub client library. This future can be used to + manage the background stream. + + The futures are stored so that they can be cancelled later on + for a graceful shutdown of the worker. + """ + run_middleware_hook("pre_worker_start") for subscription in self._subscriptions: - self._futures.append(self._subscriber.subscribe( - subscription_name=subscription.name, - callback=Callback(subscription) - )) + self._boostrap_consumption(subscription) + run_middleware_hook("post_worker_start") + + def run_forever(self, sleep_interval=1): + """Shortcut for calling setup, start, and _wait_forever. + + :param sleep_interval: Number of seconds to sleep in the ``while True`` loop + """ + self.setup() + self.start() + self._wait_forever(sleep_interval=sleep_interval) def stop(self, signal=None, frame=None): - logger.info(f'Cleaning up {len(self._futures)} subscription(s)...') - for future in self._futures: + """Manage the shutdown process of the worker. + + This function has two purposes: + + 1. Cancel all the futures created. + 2. And close all the database connections + opened by Django. Even though we cancel the connections + for every execution of the callback, we want to be sure + that all the database connections are closed + in this process. + + Exits with code 0 for a clean exit. + + :param signal: Needed for `signal.signal `_ # noqa + :param frame: Needed for `signal.signal `_ # noqa + """ + run_middleware_hook("pre_worker_stop", self._subscriptions) + for future in self._futures.values(): future.cancel() - db.connections.close_all() + run_middleware_hook("post_worker_stop") sys.exit(0) + + def _boostrap_consumption(self, subscription): + if subscription in self._futures: + self._futures[subscription].cancel() + + executor_kwargs = {"thread_name_prefix": "ThreadPoolExecutor-ThreadScheduler"} + executor = futures.ThreadPoolExecutor( + max_workers=self.threads_per_subscription, **executor_kwargs + ) + scheduler = ThreadScheduler(executor=executor) + + self._futures[subscription] = self._subscriber.consume( + subscription_name=subscription.name, + callback=Callback(subscription), + scheduler=scheduler, + ) + + def _wait_forever(self, sleep_interval): + logger.info("Consuming subscriptions...") + while True: + for subscription, future in self._futures.items(): + if future.cancelled() or future.done(): + logger.info(f"Restarting consumption of {subscription.name}.") + self._boostrap_consumption(subscription) + + time.sleep(sleep_interval) + + +def _get_stop_signal(): + """ + Get stop signal for worker. + Returns `SIGBREAK` on windows because `SIGSTP` doesn't exist on it + """ + if sys.platform.startswith("win"): + # SIGSTP doesn't exist on windows, so we use SIGBREAK instead + return signal.SIGBREAK + + return signal.SIGTSTP + + +def create_and_run(subs, config): + """ + Create and run a worker from a list of Subscription objects and a config + while waiting forever, until the process is stopped. + + We stop a worker process on: + - SIGINT + - SIGTSTP + + :param subs: List :class:`~rele.subscription.Subscription` + :param config: :class:`~rele.config.Config` + """ + print(f"Configuring worker with {len(subs)} subscription(s)...") + for sub in subs: + print(f" {sub}") + worker = Worker( + subs, + config.gc_project_id, + config.credentials, + config.ack_deadline, + config.threads_per_subscription, + ) + + # to allow killing runrele worker via ctrl+c + signal.signal(signal.SIGINT, worker.stop) + signal.signal(signal.SIGTERM, worker.stop) + signal.signal(_get_stop_signal(), worker.stop) + + worker.run_forever() diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index fc71f49..0000000 --- a/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -django -djangorestframework -google-cloud-pubsub diff --git a/requirements/base.txt b/requirements/base.txt new file mode 100644 index 0000000..9c558e3 --- /dev/null +++ b/requirements/base.txt @@ -0,0 +1 @@ +. diff --git a/requirements/deploy.txt b/requirements/deploy.txt new file mode 100644 index 0000000..af996cf --- /dev/null +++ b/requirements/deploy.txt @@ -0,0 +1 @@ +twine diff --git a/requirements/django.txt b/requirements/django.txt new file mode 100644 index 0000000..f72e43c --- /dev/null +++ b/requirements/django.txt @@ -0,0 +1,2 @@ +django +tabulate diff --git a/requirements/docs.txt b/requirements/docs.txt new file mode 100644 index 0000000..2806c16 --- /dev/null +++ b/requirements/docs.txt @@ -0,0 +1 @@ +Sphinx diff --git a/requirements/test.txt b/requirements/test.txt new file mode 100644 index 0000000..6994903 --- /dev/null +++ b/requirements/test.txt @@ -0,0 +1,7 @@ +pytest>=4.6.0 +pytest-cov>=2.7.0 +pytest-django>=3.5 +coverage>=4.4.0 +codecov>=2.0.0 +black +isort~=4.3.0 diff --git a/requirements_test.txt b/requirements_test.txt deleted file mode 100644 index 5f2e156..0000000 --- a/requirements_test.txt +++ /dev/null @@ -1,5 +0,0 @@ -coverage==4.4.1 -flake8>=2.1.0 -tox>=1.7.0 -codecov>=2.0.0 -pytest \ No newline at end of file diff --git a/runtests.py b/runtests.py index 7441944..d8eea6c 100644 --- a/runtests.py +++ b/runtests.py @@ -1,12 +1,15 @@ #!/usr/bin/env python # -*- coding: utf-8 -from __future__ import unicode_literals, absolute_import +from __future__ import absolute_import, unicode_literals import os import sys +from unittest.mock import patch import django +from rele.apps import ReleConfig + class PytestTestRunner(object): """Runs pytest to discover and run tests.""" @@ -25,15 +28,15 @@ def run_tests(self, test_labels): argv = [] if self.verbosity == 0: - argv.append('--quiet') + argv.append("--quiet") if self.verbosity == 2: - argv.append('--verbose') + argv.append("--verbose") if self.verbosity == 3: - argv.append('-vv') + argv.append("-vv") if self.failfast: - argv.append('--exitfirst') + argv.append("--exitfirst") if self.keepdb: - argv.append('--reuse-db') + argv.append("--reuse-db") argv.extend(test_labels) return pytest.main(argv) @@ -41,14 +44,17 @@ def run_tests(self, test_labels): def run_tests(*test_args): if not test_args: - test_args = ['tests'] + test_args = ["tests"] + + os.environ["DJANGO_SETTINGS_MODULE"] = "tests.settings" + + with patch.object(ReleConfig, "ready"): + django.setup() - os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.settings' - django.setup() test_runner = PytestTestRunner() failures = test_runner.run_tests(test_args) sys.exit(bool(failures)) -if __name__ == '__main__': +if __name__ == "__main__": run_tests(*sys.argv[1:]) diff --git a/setup.cfg b/setup.cfg index 2b889f8..e81121b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,28 +5,24 @@ universal = 1 exclude= __pycache__, .git, - .tox, build, dist ignore=D1,D401 -max-line-length=79 +max-line-length=90 max-complexity=10 [isort] -line_length=79 +line_length=90 default_section=THIRDPARTY known_first_party=rele multi_line_output=3 use_parentheses=true +skip=docs/ [coverage:run] include=* -omit= - manage.py - */__init__.py - */conftest.py - */settings/* - */wsgi.py - */migrations/* - */tests/* - */admin.py +omit=*/__init__.py + +[tool:pytest] +filterwarnings = + ignore::UserWarning diff --git a/setup.py b/setup.py index 1b2cbfb..282f7f0 100644 --- a/setup.py +++ b/setup.py @@ -4,74 +4,64 @@ import re import sys -try: - from setuptools import setup -except ImportError: - from distutils.core import setup +from setuptools import find_packages, setup def get_version(*file_paths): """Retrieves the version from rele/__init__.py""" filename = os.path.join(os.path.dirname(__file__), *file_paths) version_file = open(filename).read() - version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", - version_file, re.M) + version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", version_file, re.M) if version_match: return version_match.group(1) - raise RuntimeError('Unable to find version string.') + raise RuntimeError("Unable to find version string.") -version = get_version('rele', '__init__.py') +version = get_version("rele", "__init__.py") -if sys.argv[-1] == 'publish': - try: - import wheel - print('Wheel version: ', wheel.__version__) - except ImportError: - print('Wheel library missing. Please run "pip install wheel"') - sys.exit() - os.system('python setup.py sdist upload') - os.system('python setup.py bdist_wheel upload') - sys.exit() - -if sys.argv[-1] == 'tag': - print('Tagging the version on git:') +if sys.argv[-1] == "tag": + print("Tagging the version on git: %s" % version) os.system('git tag -a %s -m "version %s"' % (version, version)) - os.system('git push --tags') + os.system("git push --tags") sys.exit() -readme = open('README.md').read() +readme = open("README.md").read() setup( - name='rele', + name="rele", version=version, - description="""Google PubSub for Django""", + description="""Relé makes integration with Google PubSub easier.""", long_description=readme, - author='Mercadona', - author_email='mercadonaonline@mercadona.es', - url='https://github.com/mercadona/rele', - packages=[ - 'rele', - 'rele.management', - 'rele.management.commands', - ], + long_description_content_type="text/markdown", + author="Mercadona Tech", + author_email="software.online@mercadona.es", + url="https://github.com/mercadona/rele", + packages=find_packages(exclude=("tests",)), include_package_data=True, - install_requires=['django', 'google-cloud-pubsub'], - license='Apache Software License 2.0', + install_requires=["google-auth", "google-cloud-pubsub>=2.2.0"], + extras_require={"django": ["django", "tabulate"], "flask": ["flask"]}, + license="Apache Software License 2.0", zip_safe=False, - keywords='rele', + keywords="rele", classifiers=[ - 'Development Status :: 3 - Alpha', - 'Framework :: Django :: 1.11', - 'Framework :: Django :: 2.0', - 'Framework :: Django :: 2.1', - 'Framework :: Django :: 2.2', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Natural Language :: English', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', + "Development Status :: 3 - Alpha", + "Framework :: Django :: 1.11", + "Framework :: Django :: 2.0", + "Framework :: Django :: 2.1", + "Framework :: Django :: 2.2", + "Framework :: Django :: 3.0", + "Framework :: Django :: 3.1", + "Framework :: Django :: 3.2", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Natural Language :: English", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", ], + entry_points={"console_scripts": ["rele-cli=rele.__main__:main"]}, ) diff --git a/tests/commands/__init__.py b/tests/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/commands/test_runrele.py b/tests/commands/test_runrele.py new file mode 100644 index 0000000..06128fe --- /dev/null +++ b/tests/commands/test_runrele.py @@ -0,0 +1,41 @@ +from unittest.mock import ANY, patch + +import pytest +from django.core.management import call_command + +from rele import Worker + + +class TestRunReleCommand: + @pytest.fixture(autouse=True) + def worker_wait_forever(self): + with patch.object( + Worker, "_wait_forever", return_value=None, autospec=True + ) as p: + yield p + + @pytest.fixture + def mock_worker(self): + with patch("rele.worker.Worker", autospec=True) as p: + yield p + + def test_calls_worker_start_and_setup_when_runrele(self, mock_worker): + call_command("runrele") + + mock_worker.assert_called_with([], "rele-test", ANY, 60, 2) + mock_worker.return_value.run_forever.assert_called_once_with() + + def test_prints_warning_when_conn_max_age_not_set_to_zero( + self, mock_worker, capsys, settings + ): + settings.DATABASES = {"default": {"CONN_MAX_AGE": 1}} + call_command("runrele") + + out, err = capsys.readouterr() + assert ( + "WARNING: settings.CONN_MAX_AGE is not set to 0. " + "This may result in slots for database connections to " + "be exhausted." in err + ) + mock_worker.assert_called_with([], "rele-test", ANY, 60, 2) + mock_worker.return_value.run_forever.assert_called_once_with() diff --git a/tests/commands/test_showsubscriptions.py b/tests/commands/test_showsubscriptions.py new file mode 100644 index 0000000..7f239af --- /dev/null +++ b/tests/commands/test_showsubscriptions.py @@ -0,0 +1,57 @@ +from unittest.mock import patch + +import pytest +from django.core.management import call_command + +from rele.subscription import sub + + +@sub(topic="some-cool-topic", prefix="rele") +def sub_stub(data, **kwargs): + return data["id"] + + +@sub(topic="some-fancy-topic") +def sub_fancy_stub(data, **kwargs): + return data["id"] + + +@sub(topic="published-time-type") +def sub_published_time_type(data, **kwargs): + return f'{type(kwargs["published_at"])}' + + +def landscape_filter(**kwargs): + return kwargs.get("type") == "landscape" + + +@sub(topic="photo-updated", filter_by=landscape_filter) +def sub_process_landscape_photos(data, **kwargs): + return f'Received a photo of type {kwargs.get("type")}' + + +@pytest.fixture() +def mock_discover_subs(): + affected_path = ( + "rele.management.commands.showsubscriptions" ".discover_subs_modules" + ) + with patch(affected_path, return_value=[__name__]) as mock: + yield mock + + +class TestShowSubscriptions: + def test_prints_table_when_called(self, capfd, mock_discover_subs): + call_command("showsubscriptions") + + mock_discover_subs.assert_called_once() + captured = capfd.readouterr() + assert ( + captured.out + == """Topic Subscriber(s) Sub +------------------- -------------------- ---------------------------- +photo-updated photo-updated sub_process_landscape_photos +published-time-type published-time-type sub_published_time_type +some-cool-topic rele-some-cool-topic sub_stub +some-fancy-topic some-fancy-topic sub_fancy_stub +""" + ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..62e9ca3 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,104 @@ +import concurrent +import decimal +import json +from unittest.mock import MagicMock, patch + +import pytest +from google.cloud.pubsub_v1 import PublisherClient +from google.cloud.pubsub_v1.exceptions import TimeoutError +from google.protobuf import timestamp_pb2 + +from rele import Publisher +from rele.client import Subscriber +from rele.config import Config +from rele.middleware import register_middleware + + +@pytest.fixture +def project_id(): + return "rele-test" + + +@pytest.fixture +def config(project_id): + return Config( + { + "APP_NAME": "rele", + "SUB_PREFIX": "rele", + "GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json", + "MIDDLEWARE": ["rele.contrib.LoggingMiddleware"], + } + ) + + +@pytest.fixture +def subscriber(project_id, config): + return Subscriber(config.gc_project_id, config.credentials, 60) + + +@pytest.fixture +def mock_future(): + return MagicMock(spec=concurrent.futures.Future) + + +@pytest.fixture +def publisher(config, mock_future): + publisher = Publisher( + gc_project_id=config.gc_project_id, + credentials=config.credentials, + encoder=config.encoder, + timeout=config.publisher_timeout, + ) + publisher._client = MagicMock(spec=PublisherClient) + publisher._client.publish.return_value = mock_future + + return publisher + + +@pytest.fixture +def published_at(): + return 1560244246.863829 + + +@pytest.fixture +def time_mock(published_at): + with patch("time.time") as mock: + mock.return_value = published_at + yield mock + + +@pytest.fixture(autouse=True) +def default_middleware(config): + register_middleware(config=config) + + +@pytest.fixture +def custom_encoder(): + class DecimalEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, decimal.Decimal): + return float(obj) + + return DecimalEncoder + + +@pytest.fixture +def mock_publish_timeout(): + with patch("rele.client.Publisher.publish") as mock: + mock.side_effect = TimeoutError() + yield mock + + +@pytest.fixture +def mock_post_publish_failure(): + with patch( + "rele.contrib.logging_middleware.LoggingMiddleware.post_publish_failure" + ) as mock: + yield mock + + +@pytest.fixture +def publish_time(): + timestamp = timestamp_pb2.Timestamp() + timestamp.GetCurrentTime() + return timestamp diff --git a/tests/contrib/test_logging_middleware.py b/tests/contrib/test_logging_middleware.py new file mode 100644 index 0000000..b6669be --- /dev/null +++ b/tests/contrib/test_logging_middleware.py @@ -0,0 +1,101 @@ +import queue +from unittest.mock import MagicMock + +import pytest +from google.cloud import pubsub_v1 +from tests.subs import sub_stub + +from rele.contrib.logging_middleware import LoggingMiddleware + + +@pytest.fixture +def message_data(): + return {"foo": "bar"} + + +@pytest.fixture +def expected_message_data_log(): + return '{"foo": "bar"}' + + +@pytest.fixture +def message_wrapper(published_at, publish_time): + rele_message = pubsub_v1.types.PubsubMessage( + data='{"foo": "bar"}'.encode("utf-8"), + attributes={"lang": "es", "published_at": str(published_at)}, + message_id="1", + publish_time=publish_time, + ) + + message = pubsub_v1.subscriber.message.Message( + rele_message._pb, + "ack-id", + delivery_attempt=1, + request_queue=queue.Queue(), + ) + message.ack = MagicMock(autospec=True) + return message + + +@pytest.fixture +def expected_message_log(): + return 'Message {\n data: b\'{"foo": "bar"}\'\n ordering_key: \'\'\n attributes: {\n "lang": "es",\n "published_at": "1560244246.863829"\n }\n}' + + +class TestLoggingMiddleware: + @pytest.fixture + def logging_middleware(self, config): + logging_middleware = LoggingMiddleware() + logging_middleware.setup(config) + return logging_middleware + + def test_message_payload_log_is_converted_to_string_on_post_publish_failure( + self, + logging_middleware, + caplog, + message_data, + expected_message_data_log, + ): + logging_middleware.post_publish_failure( + sub_stub, RuntimeError("💩"), message_data + ) + + message_log = caplog.records[0].subscription_message + + assert message_log == expected_message_data_log + + def test_message_payload_log_is_converted_to_string_on_post_process_message_failure( + self, + logging_middleware, + caplog, + message_wrapper, + expected_message_log, + ): + logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, message_wrapper + ) + + message_log = caplog.records[0].subscription_message + + assert message_log == expected_message_log + + def test_post_publish_failure_message_payload_format_matches_post_process_message_failure_payload_type( + self, + logging_middleware, + caplog, + message_data, + message_wrapper, + ): + logging_middleware.post_publish_failure( + sub_stub, RuntimeError("💩"), message_data + ) + logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, message_wrapper + ) + + post_publish_failure_message_log = caplog.records[0].subscription_message + post_process_message_message_log = caplog.records[1].subscription_message + + assert type(post_publish_failure_message_log) == type( + post_process_message_message_log + ) diff --git a/tests/contrib/test_verbose_logging_middleware.py b/tests/contrib/test_verbose_logging_middleware.py new file mode 100644 index 0000000..5696f44 --- /dev/null +++ b/tests/contrib/test_verbose_logging_middleware.py @@ -0,0 +1,97 @@ +import queue +from unittest.mock import MagicMock + +import pytest +from google.cloud import pubsub_v1 +from tests.subs import sub_stub + +from rele.contrib.logging_middleware import LoggingMiddleware +from rele.contrib.verbose_logging_middleware import VerboseLoggingMiddleware + + +@pytest.fixture +def long_message_wrapper(published_at, publish_time): + long_string = "A" * 100 + rele_message = pubsub_v1.types.PubsubMessage( + data=long_string.encode("utf-8"), + attributes={"lang": "es", "published_at": str(published_at)}, + message_id="1", + publish_time=publish_time, + ) + + message = pubsub_v1.subscriber.message.Message( + rele_message._pb, + "ack-id", + delivery_attempt=1, + request_queue=queue.Queue(), + ) + message.ack = MagicMock(autospec=True) + return message + + +@pytest.fixture +def message_wrapper(published_at, publish_time): + rele_message = pubsub_v1.types.PubsubMessage( + data="ABCDE".encode("utf-8"), + attributes={"lang": "es", "published_at": str(published_at)}, + message_id="1", + publish_time=publish_time, + ) + + message = pubsub_v1.subscriber.message.Message( + rele_message._pb, + "ack-id", + delivery_attempt=1, + request_queue=queue.Queue(), + ) + message.ack = MagicMock(autospec=True) + return message + + +@pytest.fixture +def expected_message_log(): + return 'Message {\n data: b\'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\'\n ordering_key: \'\'\n attributes: {\n "lang": "es",\n "published_at": "1560244246.863829"\n }\n}' + + +class TestVerboseLoggingMiddleware: + @pytest.fixture + def verbose_logging_middleware(self, config): + verbose_logging_middleware = VerboseLoggingMiddleware() + verbose_logging_middleware.setup(config) + return verbose_logging_middleware + + @pytest.fixture + def logging_middleware(self, config): + logging_middleware = LoggingMiddleware() + logging_middleware.setup(config) + return logging_middleware + + def test_message_payload_log_is_not_truncated_on_post_process_failure( + self, + verbose_logging_middleware, + caplog, + long_message_wrapper, + expected_message_log, + ): + verbose_logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, long_message_wrapper + ) + + message_log = caplog.records[0].subscription_message + + assert message_log == expected_message_log + + def test_post_process_failure_message_payload_format_matches_logging_middleware_format( + self, verbose_logging_middleware, logging_middleware, caplog, message_wrapper + ): + verbose_logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, message_wrapper + ) + logging_middleware.post_process_message_failure( + sub_stub, RuntimeError("💩"), 1, message_wrapper + ) + + verbose_message_log = caplog.records[0].subscription_message + message_log = caplog.records[1].subscription_message + + assert verbose_message_log == message_log diff --git a/tests/dummy-pub-sub-credentials.json b/tests/dummy-pub-sub-credentials.json index d1abe8e..2379c7f 100644 --- a/tests/dummy-pub-sub-credentials.json +++ b/tests/dummy-pub-sub-credentials.json @@ -1,12 +1,12 @@ { "type": "service_account", - "project_id": "django-pubsub", + "project_id": "rele-test", "private_key_id": "ed80f7f63835d4e67ec7d613e65d4d1265c9adb2", "private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIICWgIBAAKBgFgykBgkvE7KsUNU4/el3mFrlGk8G1tO1trR4BoRWttD/wnppzk7\nO+zSeqYpTOKeKNfnz8oQcjVLothVxzhd4+/Id5U0LLVwpT79I9VuhYExMLU9mKYm\niusaJqS+knmB1RkStiZk1n17aTIsaMahgfdcgDUtJI50NksoJHVGyfvfAgMBAAEC\ngYAlFBzACbGg7lXXmLi+RF1ZV4DtPPfDS0HIfLNaQjGQPOXbpP9IcD6hMVuev34z\nR4qkOjCBIqjg/wtXJ7i5Wb+ZcLGYWmKLUTYH425ApIXnCoyn6v9bbIckq8DgNi+o\nzc9OBuVjR3dEWYvmFfn3uv4y8ZR94eaw7vBy+NRa0W9MAQJBAJjEHPXUgb8Ve4/j\nFGQkPQUHnmUXFf8mKDXpDKKiDBFglemFBGo9z69BrNhzQZrHSMbQUMbwxS/Nrnmj\nA2kFZJECQQCTzFx7DAhluvCudaQGdDAZPuuDZwqK+ZArjEZtQn/lrQshlAxzqOwU\njTXsfPliZmiQ3FkKlSKP6tMBsImZSdFvAkARajHe+FW+IcXPNlTJwbPPEfpFject\nCf2Ff8a3938mr/sG/uns7pTxZqw8lI8DBPrP50l+FE52T503MpUd8MZxAkAYrkfD\nRH8ifdUzTPHXIg/mJ1us1cgs7P/mRcZ8+F3jPMJfGRn7Nno19F7M3xHGHNPZXPKB\nkeXzooMaBSD1OB6BAkBZL/b0jSLa2AwP99v5EgEO7+Y8ezv9Hsyb6hVtbDNPBA6Y\ncWDVXslUKXrPfOgJKd/hqRiVixbzNr4b+qIv4vRO\n-----END RSA PRIVATE KEY-----", - "client_email": "django-pubsubs@django-pubsub.com", + "client_email": "rele@rele.com", "client_id": "114778270906188590090", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/djang-pubsub%40django-pubsub.com" + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/rele%40rele.com" } diff --git a/tests/more_subs/__init__.py b/tests/more_subs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/more_subs/subs.py b/tests/more_subs/subs.py new file mode 100644 index 0000000..f32ede5 --- /dev/null +++ b/tests/more_subs/subs.py @@ -0,0 +1,6 @@ +from rele import sub + + +@sub(topic="another-cool-topic", prefix="rele") +def another_sub_stub(data, **kwargs): + return data["id"] diff --git a/tests/settings.py b/tests/settings.py index 8b4105e..06b6316 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -1,14 +1,11 @@ -# -*- coding: utf-8 -from __future__ import unicode_literals, absolute_import +from __future__ import absolute_import, unicode_literals import os from logging import config as logging_config # Build paths inside the project like this: os.path.join(BASE_DIR, ...) -from google.oauth2 import service_account BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - SECRET_KEY = "Im-so-secret" DEBUG = True @@ -25,24 +22,23 @@ MIDDLEWARE_CLASSES = () -# Google Cloud -RELE_GC_PROJECT_ID = 'SOME-PROJECT-ID' -RELE_GC_CREDENTIALS = service_account.Credentials.from_service_account_file( - f'{BASE_DIR}/tests/dummy-pub-sub-credentials.json' +DATABASES = {"default": {"CONN_MAX_AGE": 0}} + +RELE = { + "APP_NAME": "test-rele", + "GC_CREDENTIALS_PATH": f"{BASE_DIR}/tests/dummy-pub-sub-credentials.json", + "SUB_PREFIX": "rele", + "MIDDLEWARE": [ + "rele.contrib.LoggingMiddleware", + "rele.contrib.DjangoDBMiddleware", + ], +} + +logging_config.dictConfig( + { + "version": 1, + "disable_existing_loggers": False, + "handlers": {"console": {"class": "logging.StreamHandler"}}, + "loggers": {"": {"level": "INFO", "handlers": ["console"]}}, + } ) - -logging_config.dictConfig({ - 'version': 1, - 'disable_existing_loggers': False, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - }, - }, - 'loggers': { - '': { - 'level': 'INFO', - 'handlers': ['console'], - }, - }, -}) diff --git a/tests/subs.py b/tests/subs.py new file mode 100644 index 0000000..43cd922 --- /dev/null +++ b/tests/subs.py @@ -0,0 +1,25 @@ +from rele import Subscription, sub + + +@sub(topic="some-cool-topic", prefix="rele") +def sub_stub(data, **kwargs): + return data["id"] + + +class ClassBasedSub(Subscription): + topic = "alternative-cool-topic" + + def __init__(self): + self._func = self.callback_func + super().__init__(self._func, self.topic) + + def callback_func(self, data, **kwargs): + return data["id"] + + +class CustomException(Exception): + pass + + +def some_other_type(): + print("Im a function") diff --git a/tests/test_client.py b/tests/test_client.py index 026b53f..249a50f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,21 +1,265 @@ -from unittest.mock import ANY, MagicMock +import concurrent +import decimal +import logging +from concurrent.futures import TimeoutError +from unittest.mock import ANY, patch -from google.cloud.pubsub_v1 import PublisherClient +import pytest +from google.api_core import exceptions +from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient -from rele import Publisher +from rele.subscription import Subscription +@pytest.mark.usefixtures("publisher", "time_mock") class TestPublisher: + def test_returns_future_when_published_called(self, published_at, publisher): + message = {"foo": "bar"} + result = publisher.publish( + topic="order-cancelled", data=message, myattr="hello" + ) - def test_returns_true_when_published_called(self): - message = {'foo': 'bar'} - publisher = Publisher() - publisher._client = MagicMock(spec=PublisherClient) + assert isinstance(result, concurrent.futures.Future) - result = publisher.publish(topic='order-cancelled', - data=message, - event_type='cancellation') + publisher._client.publish.assert_called_with( + ANY, + b'{"foo": "bar"}', + myattr="hello", + published_at=str(published_at), + ) + + def test_save_log_when_published_called(self, published_at, publisher, caplog): + caplog.set_level(logging.DEBUG) + message = {"foo": "bar"} + publisher.publish(topic="order-cancelled", data=message, myattr="hello") + + log = caplog.records[0] + + assert log.message == "Publishing to order-cancelled" + assert log.pubsub_publisher_attrs == { + "myattr": "hello", + "published_at": str(published_at), + } + assert log.metrics == { + "name": "publications", + "data": {"agent": "rele", "topic": "order-cancelled"}, + } + + def test_publish_sets_published_at(self, published_at, publisher): + publisher.publish(topic="order-cancelled", data={"foo": "bar"}) - assert result is True publisher._client.publish.assert_called_with( - ANY, b'{"foo":"bar"}', event_type='cancellation') + ANY, b'{"foo": "bar"}', published_at=str(published_at) + ) + + def test_publishes_data_with_custom_encoder(self, publisher, custom_encoder): + publisher._encoder = custom_encoder + publisher.publish(topic="order-cancelled", data=decimal.Decimal("1.20")) + + publisher._client.publish.assert_called_with(ANY, b"1.2", published_at=ANY) + + def test_publishes_data_with_client_timeout_when_blocking( + self, mock_future, publisher + ): + publisher._timeout = 100.0 + publisher.publish(topic="order-cancelled", data={"foo": "bar"}, blocking=True) + + publisher._client.publish.return_value = mock_future + publisher._client.publish.assert_called_with( + ANY, b'{"foo": "bar"}', published_at=ANY + ) + mock_future.result.assert_called_once_with(timeout=100) + + def test_publishes_data_with_client_timeout_when_blocking_and_timeout_specified( + self, mock_future, publisher + ): + publisher._timeout = 100.0 + publisher.publish( + topic="order-cancelled", + data={"foo": "bar"}, + blocking=True, + timeout=50, + ) + + publisher._client.publish.return_value = mock_future + publisher._client.publish.assert_called_with( + ANY, b'{"foo": "bar"}', published_at=ANY + ) + mock_future.result.assert_called_once_with(timeout=50) + + def test_runs_post_publish_failure_hook_when_future_result_raises_timeout( + self, mock_future, publisher, mock_post_publish_failure + ): + message = {"foo": "bar"} + exception = TimeoutError() + mock_future.result.side_effect = exception + + with pytest.raises(TimeoutError): + publisher.publish( + topic="order-cancelled", data=message, myattr="hello", blocking=True + ) + mock_post_publish_failure.assert_called_once_with( + "order-cancelled", exception, {"foo": "bar"} + ) + + def test_raises_when_timeout_error_and_raise_exception_is_true( + self, publisher, mock_future + ): + message = {"foo": "bar"} + e = TimeoutError() + mock_future.result.side_effect = e + + with pytest.raises(TimeoutError): + publisher.publish( + topic="order-cancelled", + data=message, + myattr="hello", + blocking=True, + raise_exception=True, + ) + + def test_returns_future_when_timeout_error_and_raise_exception_is_false( + self, publisher, mock_future + ): + message = {"foo": "bar"} + e = TimeoutError() + mock_future.result.side_effect = e + + result = publisher.publish( + topic="order-cancelled", + data=message, + myattr="hello", + blocking=True, + raise_exception=False, + ) + + assert result is mock_future + + +class TestSubscriber: + @pytest.fixture(autouse=True) + def mock_create_topic(self): + with patch.object( + PublisherClient, "create_topic", return_values={"name": "test-topic"} + ) as mock: + yield mock + + @patch.object(SubscriberClient, "create_subscription") + def test_creates_subscription_with_default_ack_deadline_when_none_provided( + self, + _mocked_client, + project_id, + subscriber, + ): + expected_subscription = ( + f"projects/{project_id}/subscriptions/" f"{project_id}-test-topic" + ) + expected_topic = f"projects/{project_id}/topics/" f"{project_id}-test-topic" + + subscriber.create_subscription( + Subscription(None, topic=f"{project_id}-test-topic") + ) + _mocked_client.assert_called_once_with( + request={ + "ack_deadline_seconds": 60, + "name": expected_subscription, + "topic": expected_topic, + } + ) + assert subscriber._gc_project_id == "rele-test" + + @patch.object(SubscriberClient, "create_subscription") + def test_creates_subscription_with_custom_ack_deadline_when_provided( + self, _mocked_client, project_id, subscriber + ): + expected_subscription = ( + f"projects/{project_id}/subscriptions/" f"{project_id}-test-topic" + ) + expected_topic = f"projects/{project_id}/topics/" f"{project_id}-test-topic" + subscriber._ack_deadline = 100 + subscriber.create_subscription( + Subscription(None, topic=f"{project_id}-test-topic") + ) + + _mocked_client.assert_called_once_with( + request={ + "ack_deadline_seconds": 100, + "name": expected_subscription, + "topic": expected_topic, + } + ) + + @patch.object(SubscriberClient, "create_subscription") + def test_creates_subscription_with_backend_filter_by_when_provided( + self, _mocked_client, project_id, subscriber + ): + expected_subscription = ( + f"projects/{project_id}/subscriptions/" f"{project_id}-test-topic" + ) + expected_topic = f"projects/{project_id}/topics/" f"{project_id}-test-topic" + backend_filter_by = "attributes:domain" + subscriber.create_subscription( + Subscription( + None, + topic=f"{project_id}-test-topic", + backend_filter_by=backend_filter_by, + ) + ) + + _mocked_client.assert_called_once_with( + request={ + "ack_deadline_seconds": 60, + "name": expected_subscription, + "topic": expected_topic, + "filter": backend_filter_by, + } + ) + + @patch.object( + SubscriberClient, + "create_subscription", + side_effect=exceptions.AlreadyExists("Subscription already exists"), + ) + def test_does_not_raise_when_subscription_already_exists( + self, _mocked_client, project_id, subscriber + ): + subscriber.create_subscription( + Subscription(None, topic=f"{project_id}-test-topic") + ) + + _mocked_client.assert_called() + + @patch.object( + SubscriberClient, + "create_subscription", + side_effect=[exceptions.NotFound("Subscription topic does not exist"), True], + ) + def test_creates_topic_when_subscription_topic_does_not_exist( + self, _mocked_client, project_id, subscriber, mock_create_topic + ): + expected_subscription = ( + f"projects/{project_id}/subscriptions/" f"{project_id}-test-topic" + ) + expected_topic = f"projects/{project_id}/topics/" f"{project_id}-test-topic" + backend_filter_by = "attributes:domain" + subscriber.create_subscription( + Subscription( + None, + topic=f"{project_id}-test-topic", + backend_filter_by=backend_filter_by, + ) + ) + + assert _mocked_client.call_count == 2 + mock_create_topic.assert_called_with( + request={"name": f"projects/rele-test/topics/{project_id}-test-topic"} + ) + + _mocked_client.assert_called_with( + request={ + "ack_deadline_seconds": 60, + "name": expected_subscription, + "topic": expected_topic, + "filter": backend_filter_by, + } + ) diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..7a26be4 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,143 @@ +import json +import os +from unittest.mock import patch + +import google +import pytest +from google.oauth2 import service_account + +from rele import Subscription, sub +from rele.config import Config, load_subscriptions_from_paths + + +@sub(topic="test-topic", prefix="rele") +def sub_stub(data, **kwargs): + return data["id"] + + +@sub(topic="another-cool-topic", prefix="rele") +def another_sub_stub(data, **kwargs): + return data["id"] + + +class TestLoadSubscriptions: + @pytest.fixture + def subscriptions(self): + return load_subscriptions_from_paths( + ["tests.test_config"], + sub_prefix="test", + filter_by=[lambda args: args.get("lang") == "en"], + ) + + def test_load_subscriptions_in_a_module(self, subscriptions): + assert len(subscriptions) == 2 + func_sub = subscriptions[-1] + assert isinstance(func_sub, Subscription) + assert func_sub.name == "rele-test-topic" + assert func_sub({"id": 4}, lang="en") == 4 + + def test_loads_subscriptions_when_they_are_class_based(self): + subscriptions = load_subscriptions_from_paths( + ["tests.subs"], + sub_prefix="test", + filter_by=[lambda attrs: attrs.get("lang") == "en"], + ) + + assert len(subscriptions) == 2 + klass_sub = subscriptions[0] + assert isinstance(klass_sub, Subscription) + assert klass_sub.name == "test-alternative-cool-topic" + assert klass_sub({"id": 4}, lang="en") == 4 + + def test_raises_error_when_subscription_is_duplicated(self): + with pytest.raises(RuntimeError) as excinfo: + load_subscriptions_from_paths(["tests.test_config", "tests.more_subs.subs"]) + + assert ( + str(excinfo.value) + == "Duplicate subscription name found: rele-another-cool-topic. Subs " + "tests.more_subs.subs.another_sub_stub and tests.test_config.another_sub_stub collide." + ) + + def test_returns_sub_value_when_filtered_value_applied(self, subscriptions): + + assert subscriptions[-1]({"id": 4}, lang="en") == 4 + + def test_returns_none_when_filtered_value_does_not_apply(self, subscriptions): + + assert subscriptions[0]({"id": 4}, lang="es") is None + + +class TestConfig: + def test_parses_all_keys(self, project_id, custom_encoder): + settings = { + "APP_NAME": "rele", + "SUB_PREFIX": "rele", + "GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json", + "MIDDLEWARE": ["rele.contrib.DjangoDBMiddleware"], + "ENCODER": custom_encoder, + } + + config = Config(settings) + + assert config.app_name == "rele" + assert config.sub_prefix == "rele" + assert config.gc_project_id == project_id + assert isinstance(config.credentials, service_account.Credentials) + assert config.middleware == ["rele.contrib.DjangoDBMiddleware"] + + def test_inits_service_account_creds_when_credential_path_given(self, project_id): + settings = { + "GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json", + } + + config = Config(settings) + + assert config.gc_project_id == project_id + assert isinstance(config.credentials, google.oauth2.service_account.Credentials) + assert config.credentials.project_id == "rele-test" + + def test_uses_project_id_from_creds_when_no_project_id_given(self): + settings = { + "GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json", + } + + config = Config(settings) + + assert isinstance(config.credentials, google.oauth2.service_account.Credentials) + assert config.credentials.project_id == "rele-test" + assert config.gc_project_id == "rele-test" + + @patch.dict(os.environ, {"GOOGLE_APPLICATION_CREDENTIALS": ""}) + def test_sets_defaults(self): + settings = {} + + config = Config(settings) + + assert config.app_name is None + assert config.sub_prefix is None + assert config.gc_project_id is None + assert config.credentials is None + assert config.middleware == ["rele.contrib.LoggingMiddleware"] + assert config.encoder == json.JSONEncoder + + @patch.dict( + os.environ, + { + "GOOGLE_APPLICATION_CREDENTIALS": os.path.dirname( + os.path.realpath(__file__) + ) + + "/dummy-pub-sub-credentials.json" + }, + ) + def test_sets_defaults_pulled_from_env(self, monkeypatch, project_id): + settings = {} + + config = Config(settings) + + assert config.app_name is None + assert config.sub_prefix is None + assert config.gc_project_id == "rele-test" + assert isinstance(config.credentials, google.oauth2.service_account.Credentials) + assert config.middleware == ["rele.contrib.LoggingMiddleware"] + assert config.encoder == json.JSONEncoder diff --git a/tests/test_discover.py b/tests/test_discover.py new file mode 100644 index 0000000..7e4f2d8 --- /dev/null +++ b/tests/test_discover.py @@ -0,0 +1,24 @@ +import pytest +from tests import settings + +from rele import discover + + +class TestDiscoverSubModules: + def test_returns_settings_and_paths_when_settings_found(self): + discovered_settings, paths = discover.sub_modules("tests.settings") + + assert discovered_settings is settings + assert discovered_settings.RELE == settings.RELE + assert paths == ["tests.subs", "tests.more_subs.subs"] + + def test_returns_empty_settings_when_no_settings_module_found(self): + discovered_settings, paths = discover.sub_modules() + + assert discovered_settings is None + assert paths == ["tests.subs", "tests.more_subs.subs"] + + def test_raises_when_incorrect_path(self): + incorrect_path = "tests.foo" + with pytest.raises(ModuleNotFoundError): + discover.sub_modules(incorrect_path) diff --git a/tests/test_middleware.py b/tests/test_middleware.py new file mode 100644 index 0000000..57037bc --- /dev/null +++ b/tests/test_middleware.py @@ -0,0 +1,34 @@ +from unittest.mock import patch + +import pytest + +import rele +from rele.middleware import BaseMiddleware + + +class TestMiddleware: + @pytest.fixture + def mock_init_global_publisher(self): + with patch("rele.config.init_global_publisher", autospec=True) as m: + yield m + + @pytest.mark.usefixtures("mock_init_global_publisher") + @patch("rele.contrib.FlaskMiddleware.setup", autospec=True) + def test_setup_fn_is_called_with_kwargs(self, mock_middleware_setup, project_id): + + settings = { + "GC_PROJECT_ID": project_id, + "MIDDLEWARE": ["rele.contrib.FlaskMiddleware"], + } + + rele.setup(settings, foo="bar") + assert mock_middleware_setup.called + assert mock_middleware_setup.call_args_list[0][-1] == {"foo": "bar"} + + def test_warns_about_deprecated_hooks(self): + + with pytest.warns(DeprecationWarning): + + class TestMiddleware(BaseMiddleware): + def post_publish(self, topic): + pass diff --git a/tests/test_publishing.py b/tests/test_publishing.py new file mode 100644 index 0000000..4c7d4f8 --- /dev/null +++ b/tests/test_publishing.py @@ -0,0 +1,51 @@ +from unittest.mock import MagicMock, patch + +import pytest +from tests import settings + +from rele import Publisher, publishing + + +class TestPublish: + @patch("rele.publishing.Publisher", autospec=True) + def test_instantiates_publisher_and_publishes_when_does_not_exist( + self, mock_publisher + ): + with patch("rele.publishing.discover") as mock_discover: + mock_discover.sub_modules.return_value = settings, [] + + message = {"foo": "bar"} + publishing.publish(topic="order-cancelled", data=message, myattr="hello") + + mock_publisher.return_value.publish.assert_called_with( + "order-cancelled", {"foo": "bar"}, myattr="hello" + ) + + def test_raises_error_when_publisher_does_not_exists_and_settings_not_found(self): + publishing._publisher = None + message = {"foo": "bar"} + + with pytest.raises(ValueError): + publishing.publish(topic="order-cancelled", data=message, myattr="hello") + + +class TestInitGlobalPublisher: + @patch("rele.publishing.Publisher", autospec=True) + def test_creates_global_publisher_when_published_called( + self, mock_publisher, config + ): + publishing._publisher = None + mock_publisher.return_value = MagicMock(spec=Publisher) + publishing.init_global_publisher(config) + message = {"foo": "bar"} + publishing.publish(topic="order-cancelled", data=message, myattr="hello") + assert isinstance(publishing._publisher, Publisher) + publisher_id = id(publishing._publisher) + + mock_publisher.return_value.publish.assert_called_with( + "order-cancelled", {"foo": "bar"}, myattr="hello" + ) + + mock_publisher.return_value = MagicMock(spec=Publisher) + publishing.publish(topic="order-cancelled", data=message, myattr="hello") + assert id(publishing._publisher) == publisher_id diff --git a/tests/test_runpubsub.py b/tests/test_runpubsub.py deleted file mode 100644 index 339f170..0000000 --- a/tests/test_runpubsub.py +++ /dev/null @@ -1,17 +0,0 @@ -from unittest.mock import patch - -from django.core.management import call_command - -from rele.management.commands.runrele import Command - - -class TestRunReleCommand: - - @patch.object(Command, '_wait_forever', return_value=None) - @patch('rele.management.commands.runrele.Worker', autospec=True) - def test_calls_worker_start_and_setup_when_runrele( - self, mock_worker, mock_wait_forever): - call_command('runrele') - - mock_worker.return_value.setup.assert_called() - mock_worker.return_value.start.assert_called() diff --git a/tests/test_subscription.py b/tests/test_subscription.py index ebba8aa..6b1f800 100644 --- a/tests/test_subscription.py +++ b/tests/test_subscription.py @@ -1,75 +1,363 @@ import logging +import queue +import time from unittest.mock import MagicMock, patch import pytest from google.cloud import pubsub_v1 +from google.protobuf import timestamp_pb2 from rele import Callback, Subscription, sub +from rele.middleware import register_middleware logger = logging.getLogger(__name__) -@sub(topic='some-cool-topic') +@sub(topic="some-cool-topic", prefix="rele") def sub_stub(data, **kwargs): - logger.info(f'I am a task doing stuff with ID {data["id"]} ' - f'({kwargs["lang"]})') + logger.info(f'I am a task doing stuff with ID {data["id"]} ' f'({kwargs["lang"]})') + return data["id"] -class TestSubscription: +@sub(topic="some-fancy-topic") +def sub_fancy_stub(data, **kwargs): + logger.info( + f'I used to have a prefix, but not anymore, only {data["id"]}' + f'id {kwargs["lang"]}' + ) + return data["id"] + + +@sub(topic="published-time-type") +def sub_published_time_type(data, **kwargs): + logger.info(f'{type(kwargs["published_at"])}') + + +def landscape_filter(kwargs): + return kwargs.get("type") == "landscape" + + +def gif_filter(kwargs): + return kwargs.get("format") == "gif" + +@sub(topic="photo-updated", filter_by=landscape_filter) +def sub_process_landscape_photos(data, **kwargs): + return f'Received a photo of type {kwargs.get("type")}' + + +@sub(topic="photo-updated", filter_by=[landscape_filter, gif_filter]) +def sub_process_landscape_gif_photos(data, **kwargs): + return f'Received a {kwargs.get("format")} photo of type {kwargs.get("type")}' + + +class TestSubscription: def test_subs_return_subscription_objects(self): assert isinstance(sub_stub, Subscription) - assert sub_stub.topic == 'some-cool-topic' - assert sub_stub.name == 'rele-some-cool-topic' + assert sub_stub.topic == "some-cool-topic" + assert sub_stub.name == "rele-some-cool-topic" + + def test_subs_without_prefix_return_subscription_objects(self): + assert isinstance(sub_fancy_stub, Subscription) + assert sub_fancy_stub.topic == "some-fancy-topic" + assert sub_fancy_stub.name == "some-fancy-topic" def test_executes_callback_when_called(self, caplog): - res = sub_stub({'id': 123}, **{'lang': 'es'}) + res = sub_stub({"id": 123}, **{"lang": "es"}) - assert res is None + assert res == 123 log2 = caplog.records[0] - assert log2.message == 'I am a task doing stuff with ID 123 (es)' + assert log2.message == "I am a task doing stuff with ID 123 (es)" + def test_sub_executes_when_message_attributes_match_criteria(self): + data = {"name": "my_new_photo.jpeg"} + response = sub_process_landscape_photos(data, type="landscape") -class TestCallback: + assert response == "Received a photo of type landscape" + def test_sub_does_not_execute_when_message_attributes_dont_match_criteria( + self, + ): + data = {"name": "my_new_photo.jpeg"} + response = sub_process_landscape_photos(data, type="") + + assert response is None + + def test_sub_executes_when_message_attributes_matches_multiple_criterias( + self, + ): + data = {"name": "my_new_photo.jpeg"} + response = sub_process_landscape_gif_photos( + data, type="landscape", format="gif" + ) + + assert response == "Received a gif photo of type landscape" + + @pytest.mark.parametrize( + "type, format", + [ + ("portrait", "gif"), + ("landscape", "jpg"), + ("portrait", "jpg"), + (None, "gif"), + ("portrait", None), + (None, None), + ], + ) + def test_sub_is_not_executed_when_message_attribs_dont_match_all_criterias( + self, type, format + ): + data = {"name": "my_new_photo.jpeg"} + response = sub_process_landscape_gif_photos(data, type=type, format=format) + + assert response is None + + def test_raises_error_when_filter_by_is_not_valid(self, caplog): + Subscription( + func=lambda x: None, topic="topic", prefix="rele", filter_by=lambda x: True + ) + Subscription( + func=lambda x: None, + topic="topic", + prefix="rele", + filter_by=((lambda x: True),), + ) + + with pytest.raises(ValueError): + Subscription(func=lambda x: None, topic="topic", prefix="rele", filter_by=1) + + with pytest.raises(ValueError): + Subscription( + func=lambda x: None, topic="topic", prefix="rele", filter_by=(1,) + ) + + +class TestCallback: @pytest.fixture(autouse=True) def mock_close_old_connections(self): - with patch('rele.subscription.db.' - 'close_old_connections') as mock_old_connections: + with patch( + "rele.contrib.django_db_middleware.db." "close_old_connections" + ) as mock_old_connections: yield mock_old_connections @pytest.fixture - def message_wrapper(self): + def published_at(self): + return time.time() + + @pytest.fixture + def publish_time(self): + timestamp = timestamp_pb2.Timestamp() + timestamp.GetCurrentTime() + return timestamp + + @pytest.fixture + def message_wrapper(self, published_at, publish_time): rele_message = pubsub_v1.types.PubsubMessage( - data=b'{"id": 123}', attributes={'lang': 'es'}, message_id='1') - return pubsub_v1.subscriber.message.Message( - rele_message, 'ack-id', MagicMock()) + data=b'{"id": 123}', + attributes={"lang": "es", "published_at": str(published_at)}, + message_id="1", + publish_time=publish_time, + ) - def test_acks_message_when_callback_called(self, caplog, message_wrapper): - callback = Callback(sub_stub) - res = callback(message_wrapper) + message = pubsub_v1.subscriber.message.Message( + rele_message._pb, + "ack-id", + delivery_attempt=1, + request_queue=queue.Queue(), + ) + message.ack = MagicMock(autospec=True) + return message - assert res is None + @pytest.fixture + def message_wrapper_empty(self): + rele_message = pubsub_v1.types.PubsubMessage( + data=b"", attributes={"lang": "es"}, message_id="1" + ) + message = pubsub_v1.subscriber.message.Message( + rele_message, "ack-id", MagicMock() + ) + message.ack = MagicMock(autospec=True) + return message + + @pytest.fixture + def message_wrapper_invalid_json(self, publish_time): + rele_message = pubsub_v1.types.PubsubMessage( + data=b"foobar", + attributes={}, + message_id="1", + publish_time=publish_time, + ) + message = pubsub_v1.subscriber.message.Message( + rele_message._pb, + "ack-id", + delivery_attempt=1, + request_queue=queue.Queue(), + ) + message.ack = MagicMock(autospec=True) + return message + + def test_log_start_processing_when_callback_called( + self, caplog, message_wrapper, published_at + ): + with caplog.at_level(logging.DEBUG): + callback = Callback(sub_stub) + res = callback(message_wrapper) + + assert res == 123 log1 = caplog.records[0] - assert log1.message == ('Start processing message for ' - 'rele-some-cool-topic - sub_stub') - log2 = caplog.records[1] - assert log2.message == 'I am a task doing stuff with ID 123 (es)' + assert log1.message == ( + "Start processing message for " "rele-some-cool-topic - sub_stub" + ) + assert log1.metrics == { + "name": "subscriptions", + "data": { + "agent": "rele", + "topic": "some-cool-topic", + "status": "received", + "subscription": "rele-some-cool-topic", + "attributes": { + "lang": "es", + "published_at": str(published_at), + }, + }, + } + + def test_acks_message_when_execution_successful(self, caplog, message_wrapper): + with caplog.at_level(logging.DEBUG): + callback = Callback(sub_stub) + res = callback(message_wrapper) + + assert res == 123 + message_wrapper.ack.assert_called_once() + assert len(caplog.records) == 3 + message_wrapper_log = caplog.records[1] + assert message_wrapper_log.message == ( + "I am a task doing " "stuff with ID 123 (es)" + ) + + def test_log_when_execution_is_succesful( + self, message_wrapper, caplog, published_at + ): + callback = Callback(sub_stub) + callback(message_wrapper) + + success_log = caplog.records[-1] + assert success_log.message == ( + "Successfully processed message for " "rele-some-cool-topic - sub_stub" + ) + assert success_log.metrics == { + "name": "subscriptions", + "data": { + "agent": "rele", + "topic": "some-cool-topic", + "status": "succeeded", + "subscription": "rele-some-cool-topic", + "duration_seconds": pytest.approx(0.5, abs=0.5), + "attributes": { + "lang": "es", + "published_at": str(published_at), + }, + }, + } - def test_does_not_ack_message_when_callback_raises( - self, caplog, message_wrapper): - @sub(topic='some-cool-topic') - def crashy_sub_stub(message, **kwargs): - raise ValueError('I am an exception from a sub') + def test_log_does_not_ack_called_message_when_execution_fails( + self, caplog, message_wrapper, published_at + ): + @sub(topic="some-cool-topic", prefix="rele") + def crashy_sub_stub(data, **kwargs): + raise ValueError("I am an exception from a sub") callback = Callback(crashy_sub_stub) res = callback(message_wrapper) assert res is None - log1 = caplog.records[0] - assert log1.message == ('Start processing message for rele-' - 'some-cool-topic - crashy_sub_stub') - log2 = caplog.records[1] - assert log2.message == ('Exception raised while processing message for' - ' rele-some-cool-topic - ' - 'crashy_sub_stub: ValueError') + message_wrapper.ack.assert_not_called() + failed_log = caplog.records[-1] + assert failed_log.message == ( + "Exception raised while processing " + "message for rele-some-cool-topic - " + "crashy_sub_stub: ValueError" + ) + assert failed_log.metrics == { + "name": "subscriptions", + "data": { + "agent": "rele", + "topic": "some-cool-topic", + "status": "failed", + "subscription": "rele-some-cool-topic", + "duration_seconds": pytest.approx(0.5, abs=0.5), + "attributes": { + "lang": "es", + "published_at": str(published_at), + }, + }, + } + assert failed_log.subscription_message == str(message_wrapper) + + def test_log_acks_called_message_when_not_json_serializable( + self, caplog, message_wrapper_invalid_json, published_at + ): + callback = Callback(sub_stub) + res = callback(message_wrapper_invalid_json) + + assert res is None + message_wrapper_invalid_json.ack.assert_called_once() + failed_log = caplog.records[-1] + assert failed_log.message == ( + "Exception raised while processing " + "message for rele-some-cool-topic - " + "sub_stub: JSONDecodeError" + ) + assert failed_log.metrics == { + "name": "subscriptions", + "data": { + "agent": "rele", + "topic": "some-cool-topic", + "status": "failed", + "subscription": "rele-some-cool-topic", + "duration_seconds": pytest.approx(0.5, abs=0.5), + "attributes": {}, + }, + } + assert failed_log.subscription_message == str(message_wrapper_invalid_json) + + def test_published_time_as_message_attribute(self, message_wrapper, caplog): + callback = Callback(sub_published_time_type) + callback(message_wrapper) + + success_log = caplog.records[-2] + assert success_log.message == "" + + def test_old_django_connections_closed_when_middleware_is_used( + self, mock_close_old_connections, message_wrapper, config + ): + config.middleware = ["rele.contrib.DjangoDBMiddleware"] + register_middleware(config) + callback = Callback(sub_stub) + res = callback(message_wrapper) + + assert res == 123 + assert mock_close_old_connections.call_count == 2 + + +class TestDecorator: + def test_returns_subscription_when_callback_valid(self): + subscription = sub(topic="topic", prefix="rele")(lambda data, **kwargs: None) + assert isinstance(subscription, Subscription) + + def test_raises_error_when_function_signature_is_not_valid(self): + with pytest.raises(RuntimeError): + sub(topic="topic", prefix="rele")(lambda: None) + + with pytest.raises(RuntimeError): + sub(topic="topic", prefix="rele")(lambda data: None) + + with pytest.raises(RuntimeError): + sub(topic="topic", prefix="rele")(lambda data, value=None: None) + + def test_logs_warning_when_function_not_in_subs_module(self, caplog): + sub(topic="topic", prefix="rele")(lambda data, **kwargs: None) + assert ( + "Subscription function tests.test_subscription. is outside a subs " + "module that will not be discovered." in caplog.text + ) diff --git a/tests/test_worker.py b/tests/test_worker.py index 87cce3b..94d610d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,46 +1,164 @@ +import time +from concurrent import futures from unittest.mock import ANY, patch import pytest +from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler from rele import Subscriber, Worker, sub +from rele.middleware import register_middleware +from rele.subscription import Callback +from rele.worker import create_and_run -@sub(topic='some-cool-topic') +@sub(topic="some-cool-topic", prefix="rele") def sub_stub(data, **kwargs): - print(f'I am a task doing stuff.') + print(f"I am a task doing stuff.") -class TestWorker: +@pytest.fixture +def worker(config): + subscriptions = (sub_stub,) + return Worker( + subscriptions, + config.gc_project_id, + config.credentials, + default_ack_deadline=60, + threads_per_subscription=10, + ) + + +@pytest.fixture +def mock_consume(config): + with patch.object(Subscriber, "consume") as m: + client = pubsub_v1.SubscriberClient(credentials=config.credentials) + m.return_value = client.subscribe("dummy-subscription", Callback(sub_stub)) + yield m + + +@pytest.fixture +def mock_create_subscription(): + with patch.object(Subscriber, "create_subscription") as m: + yield m - @patch.object(Subscriber, 'subscribe') + +class TestWorker: def test_start_subscribes_and_saves_futures_when_subscriptions_given( - self, mock_subscribe): - subscriptions = (sub_stub,) - worker = Worker(subscriptions) + self, mock_consume, worker + ): worker.start() - mock_subscribe.assert_called_once_with( - subscription_name='rele-some-cool-topic', - callback=ANY + mock_consume.assert_called_once_with( + subscription_name="rele-some-cool-topic", + callback=ANY, + scheduler=ANY, ) + scheduler = mock_consume.call_args_list[0][1]["scheduler"] + assert isinstance(scheduler, ThreadScheduler) + assert isinstance(scheduler._executor, futures.ThreadPoolExecutor) - @patch.object(Subscriber, 'create_subscription') - def test_setup_creates_subscription_when_topic_given( - self, mock_create_subscription): - subscriptions = (sub_stub,) - worker = Worker(subscriptions) - worker.setup() + @patch.object(Worker, "_wait_forever") + def test_run_sets_up_and_creates_subscriptions_when_called( + self, mock_wait_forever, mock_consume, mock_create_subscription, worker + ): + worker.run_forever() - topic = 'some-cool-topic' - subscription = 'rele-some-cool-topic' - mock_create_subscription.assert_called_once_with(subscription, topic) + mock_create_subscription.assert_called_once_with(sub_stub) + mock_consume.assert_called_once_with( + subscription_name="rele-some-cool-topic", + callback=ANY, + scheduler=ANY, + ) + scheduler = mock_consume.call_args_list[0][1]["scheduler"] + assert isinstance(scheduler, ThreadScheduler) + assert isinstance(scheduler._executor, futures.ThreadPoolExecutor) + mock_wait_forever.assert_called_once() - @patch('rele.worker.db.connections.close_all') - def test_stop_closes_db_connections(self, mock_db_close_all): - subscriptions = (sub_stub,) - worker = Worker(subscriptions) + @patch.object(Worker, "_wait_forever") + @pytest.mark.usefixtures("mock_consume", "mock_create_subscription") + def test_wait_forevers_for_custom_time_period_when_called_with_argument( + self, mock_wait_forever, worker + ): + worker.run_forever(sleep_interval=127) + + mock_wait_forever.assert_called_once() + + @patch("rele.contrib.django_db_middleware.db.connections.close_all") + def test_stop_closes_db_connections(self, mock_db_close_all, config, worker): + config.middleware = ["rele.contrib.DjangoDBMiddleware"] + register_middleware(config=config) with pytest.raises(SystemExit): worker.stop() mock_db_close_all.assert_called_once() + + @pytest.mark.usefixtures("mock_create_subscription") + def test_creates_subscription_with_custom_ack_deadline_from_environment( + self, config + ): + subscriptions = (sub_stub,) + custom_ack_deadline = 234 + worker = Worker( + subscriptions, + config.gc_project_id, + config.credentials, + custom_ack_deadline, + threads_per_subscription=10, + ) + worker.setup() + + assert worker._subscriber._ack_deadline == custom_ack_deadline + assert worker._subscriber._gc_project_id == "rele-test" + + +@pytest.mark.usefixtures("mock_create_subscription") +class TestRestartConsumer: + @pytest.fixture(autouse=True) + def mock_sleep(self): + with patch.object(time, "sleep", side_effect=ValueError) as m: + yield m + + def test_does_not_restart_consumption_when_everything_goes_well( + self, worker, mock_consume + ): + with pytest.raises(ValueError): + worker.run_forever() + + assert len(mock_consume.call_args_list) == 1 + + def test_restarts_consumption_when_future_is_cancelled(self, worker, mock_consume): + mock_consume.return_value.cancel() + + with pytest.raises(ValueError): + worker.run_forever() + + assert len(mock_consume.call_args_list) == 2 + + def test_restarts_consumption_when_future_is_done(self, worker, mock_consume): + mock_consume.return_value.set_result(True) + + with pytest.raises(ValueError): + worker.run_forever() + + assert len(mock_consume.call_args_list) == 2 + + +class TestCreateAndRun: + @pytest.fixture(autouse=True) + def worker_wait_forever(self): + with patch.object(Worker, "_wait_forever", return_value=None) as p: + yield p + + @pytest.fixture + def mock_worker(self): + with patch("rele.worker.Worker", autospec=True) as p: + yield p + + def test_waits_forever_when_called_with_config_and_subs(self, config, mock_worker): + subscriptions = (sub_stub,) + create_and_run(subscriptions, config) + + mock_worker.assert_called_with(subscriptions, "rele-test", ANY, 60, 2) + mock_worker.return_value.run_forever.assert_called_once_with() diff --git a/tox.ini b/tox.ini deleted file mode 100644 index c9654ff..0000000 --- a/tox.ini +++ /dev/null @@ -1,19 +0,0 @@ -[tox] -envlist = - {py35,py36}-django-20 - {py35,py36}-django-111 - {py36,py37}-django-master - -[testenv] -setenv = - PYTHONPATH = {toxinidir}:{toxinidir}/rele -commands = coverage run --source rele runtests.py -deps = - django-111: Django>=1.11,<1.12 - django-20: Django>=2.0,<2.1 - django-master: https://github.com/django/django/archive/master.tar.gz - -r{toxinidir}/requirements_test.txt -basepython = - py37: python3.7 - py36: python3.6 - py35: python3.5