Djangoで非同期処理を実装する方法(Celery、Redis)Macローカル編のイメージ画像

Djangoで非同期処理を実装する方法(Celery、Redis)Macローカル編

  • 公開日:2019/06/04
  • 更新日:2020/05/07
  • 投稿者:n bit

Celeryや、Redisを利用してDjangoに非同期処理を実装する方法について解説します。今回はMacローカルでの開発環境用に非同期処理の環境を構築し、実際に簡易な関数を実行、DBにバックエンドの内容を保存し結果を画面上に表示するするところまでの解説です。

  • Django

この記事は約 分で読めます。(文字)

非同期処理の利用場面と重要性

Djangoを使ってクラウドアプリなどを開発したときに、非常に処理の重い関数などを記述していた場合にはブラウザ経由でアクセスすると大抵サーバのタイムアウト問題等に引っかかります。

サーバのタイムアウトまでの時間を出来る限り伸ばしタイムアウト問題を回避する方法もありますが、処理が終わるまでの時間ユーザにとっては待ち時間となり、その他の操作を行うことができなくなります。

これはUXの観点から言ってもあまり良い状況とは言えません。このようなケースに対してサーバのタイムアウト時間を伸ばすのではなく非同期処理を利用しhttpレスポンスをすぐに返すことでユーザの待ち時間をなくし、かつ、サーバのタイムアウト問題も回避することができるようになります。

通常のUX

非同期処理実装前

  • ブラウザからサーバにhttpリクエストを送る
  • 重い処理を即実行(実行中、ブラウザ側は待機中)
  • 処理結果をhttpレスポンスとして返す

非同期処理を行わない通常のUXの場合、リクエストを受け取ってから処理が完了し結果を返せるまでブラウザは待機中となってしまいます。

非同期のUX

非同期処理実装後

  • ブラウザからサーバにhttpリクエストを送る
  • 重い処理を実行プロセスに登録
  • 任意の内容をhttpレスポンスとしてすぐに返す

非同期処理を実装したUXの場合、実行処理を別のプロセスに登録しすぐにレスポンスを返すためブラウザ側ではすぐに次の操作を行うことができます。

重たい処理は別プロセスで処理中のためブラウザ側にレスポンスとして結果を返せないため、「実行処理を開始しました。」等、任意の内容をレスポンスとして一旦返す設定が必要です。

Djangoで非同期処理を実装するパッケージと役割

Djangoで非同期処理を実装するのに利用できるパッケージやモジュールはいくつかありますが、その中でも比較的多く利用されているCeleryとRedisを今回は利用します。

下記画像は、Django、Celery、Redisを利用した非同期処理の全体的な流れを表したものです。次のセクションで各名称とそれぞれの役割について解説しました。

非同期処理全体図(Django、Celery、Redis)

名称と役割

非同期処理を実装するにあたっていくつか普段聞き慣れない名称が登場します。用語を理解しておくことでこの後の解説もスムーズに理解しやすくなりますので、まずは全体の処理の流れ中でそれぞれの役割と名称について理解しておきましょう。

タスク(Task)

非同期で実行させる処理内容をひとかたまりにまとめたもの。ブラウザからのリクエストによって必要となった実行処理する関数やその引数などが記録されています。

キュー(Queue)

タスクを格納する入れ物入れ物の構造が、先に入ったタスクから順に取り出す様になっているものをキューと言います。

ちなみに、最後に入ったタスクから順に取り出す構造のものをスタックと呼びます。あわせて記憶しておくと良いでしょう。

  • キュー(Queue)
    先入先出【FIFO:First In First Out(ファースト・イン・ファースト・アウト)】
  • スタック(Stack)
    後入先出【LIFO:Last In First Out(ラスト・イン・ファースト・アウト)】

プロデューサー(Producer)

タスクを作成してブローカーに渡す役割を持ったもの。今回はこのプロデューサーに『Celery Client』を利用します。

Note

クライアント(Client)

他のコンピュータを利用する側のコンピュータ。つまり、ユーザ側のコンピュータや、他のコンピュータに命令を送る側のコンピュータのことです。

ブローカー(Broker)

作成されたタスクをキューに登録したり、キューに登録されているタスクをワーカーに渡したりする役割を持ったもの。今回はこのブローカーに『Redis』を利用します。

ワーカー(Woker)

ブローカーによってキューから取り出されたタスクを実際に処理する役割を持ったもの。今回はこのワーカーに『Celery Woker』を利用します。

Celery

Celeryは、Python製のタスクキューサービスでDjangoとあわせて利用しやすくなっています。タスクキューサービスは処理の重たいタスクをキューに登録し、別のプロセスで分散実行するための仕組み。

Celeryは別途メッセージの送受信を行うサービスを必要としますのでメッセージブローカーのRedisを使用します。

Redis(リモートディクショナリサーバ)

Redisはリモートディクショナリサーバを省略したもので、メッセージブローカー、および、キューとして利用できるオープンソースのKVS(キーバリューストア)。メインメモリ上で動作する(インメモリ)ため非常に高速で1秒間に数百万件のリクエストを処理することも可能。

Note

KVS(キーバリューストア)

データを保存したり管理するための手法の1つで、キー(key)と値(value)を一対にして管理しています。

Django、Celery、Redisを利用した非同期処理の実装

大まかな全体的の流れが掴めたところで次は実際の実装処理解説。

必要パッケージのインストール

最初に必要なパッケージのインストール作業を行います。非同期処理を実装するにあたり利用したものは以下の通りです。

pip install

  • celery
  • django-celery-results
  • redis
  • django-redis
  • django-celery-beat

django-celery-results

非同期処理したタスクの実行結果をデータベースに保存します。

django-celery-beat

定期実行等、非同期処理のスケジュールを管理します。

brew install

  • redis

Redisのインストール

Redisのインストール作業については下記のページをご確認ください。

pipで管理できるパッケージのインストール

pipで管理できるパッケージのインストールについては、そのままpipのインストールコマンドでインストールしていくだけです。

$ pip install celery

$ pip install django-celery-results
$ pip install redis
$ pip install django-redis
$ pip install django-celery-beat

Djangoプロジェクトファイルの作成

今回の解説用にプロジェクト名を『project』、アプリ名を『testapp』で新規のDjangoプロジェクトを作成します。この後解説する追加ファイルも含めた状態で最終的には以下のようなディレクトリ構成になります。

$ tree

.
├── testapp
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── forms.py
│   ├── models.py
│   ├── templates
│   │   └── testapp
│   │   └── celery_test.html
│   ├── tests.py
│   ├── urls.py
│   └── views.py
├── manage.py
├── project
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── sitemap.py
│   ├── tasks.py
│   ├── urls.py
│   └── wsgi.py
└── test_db.sqlite3

それでは各ファイル内の設定や、追加ファイル等の内容を詳しく見ていきましょう。

settings.py

新しく追加したパッケージ『django_celery_beat』と『django_celery_results』をsettings.pyのINSTALLED_APPSに追加します。

INSTALLED_APPS = [




'django_celery_beat',
'django_celery_results',
]

非同期処理の状態や実行結果をデーターベースに保存し、後から取得できるようにするには以下の1行も追加しておきましょう。

CELERY_RESULT_BACKEND = 'django-db'

今回はCeleryのブローカーにRedisを利用するため『CELERY_BROKER_URL』にRedisサーバのロケーションを指定します。

CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/1')

celery.py

settings.pyと同じ階層にcelery.pyを新規作成します。ファイル内のコードは以下の通りです。『project』の部分はプロジェクト名を表しています。違うプロジェクト名でDjangoファイルを作成している場合は適宜変更してください。

from __future__ import absolute_import, unicode_literals

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('project')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

各行のコード内容を解説しておきます。

以下の2行は、appに対してDjangoの設定モジュール(settings.py)をCeleryのデフォルト設定として利用するためのコードです。

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

app = Celery('project')

namespace = 'CELERY'は、すべてのcelery関連の設定キーに『CELERY_』という接頭辞を付けることを意味しています。

app.config_from_object('django.conf:settings', namespace='CELERY')

以下の1行で、Djangoアプリに登録されているすべてのタスクモジュールをロードします。

app.autodiscover_tasks()

tasks.py

settings.pyと同じ階層にtasks.pyを新規作成します。tasks.pyの中に非同期で利用する関数を記述しておきます。

from __future__ import absolute_import, unicode_literals

from celery import shared_task
import time

@shared_task
def add(x1, x2):
time.sleep(10)
y = x1 + x2
print('処理完了')
return y

今回記述した関数は10秒後に引数で受け取ったx1、x2を加算してリターンする簡単な関数です。テストに利用する関数の内容はどのようなものでも構いませんが、タスクとして利用する関数の引数に直接クエリセットのオブジェクトを渡すことはできないので気をつけましょう。

def add(x1, x2):

time.sleep(10)
y = x1 + x2
print('処理完了')
return y

関数の前に『@shared_task』デコレータをつけることで、 Celeryのタスクとしてロードし登録されます。『@shared_task』デコレータをつけた関数はすべて登録され、後ほど非同期処理のタスクとして処理できます。

@shared_task

Note

デコレータ

デコレータとは、(デコレート:修飾する)からもわかるように、関数の直前に付与することでその関数を修飾する(機能を付加する)ための仕組みのことです。

__init__.py

settings.pyと同じ階層の__init__.pyを編集します。__init__.pyの中に下記内容を記述してください。

from .celery import app as celery_app


__all__ = ('celery_app',)

Django起動時にcelery.pyからappをインポートしてロードします。

testapp > views.py

基本的な設定は完了しましたので、アプリの方でタスクを非同期処理させるためのコードを記述していきましょう。

from celery.result import AsyncResult


from project.tasks import add

def celery_test(request):
task_id = add.delay(5, 5)

result = AsyncResult(task_id)
print('result:', result, ' : ', result.state, ' : ', result.ready())

context = {'result': result}

return render(request, 'testapp/celery_test.html', context)

tasks.pyに記述した関数をインポートしておきます。

from project.tasks import add

先ほども少し解説したように『@shared_task』デコレータをつけた関数をタスクとして利用できます。非同期処理させるにはデコレータをつけた関数に『.delay』メソッドを追加することで非同期処理用のタスクとしてキューに追加されます。

関数名と引数はそのままに『.delay』メソッドを追加してください。

  • add(5, 5)
    ↓:非同期処理として実行する
  • add.delay(5, 5)

AsyncResultは『.delay』メソッドの戻り値を扱います。

print('result:', result, ' : ', result.state, ' : ', result.ready())

printで実行すると下記のような内容を返します。

result: 00000000-0000-0000-0000-000000000000  :  PENDING  :  False

result

タスクのIDが表示されます。

result.state

タスクの状態を確認できます。

  • PENDING:保留中
  • FAILURE:失敗
  • SUCCESS:成功
  • etc

result.ready()

非同期処理が終了したかどうかを判定します。

  • True:終了
  • False:終了していない

testapp > templates > testapp > celery_test.html

読み込むテンプレートを作成しておきます。今回はテスト用にメソッドの戻り値を表示させておきましょう。

<html>

<head>
<title>非同期処理TEST結果</title>
</head>
<body>
<h1>非同期処理TEST結果</h1>
<p>{{ result }}</p>
<p>{{ result.state }}</p>
<p>{{ result.ready }}</p>
</body>
</html>

testapp > urls.py

def celery_testを呼び出すためのパスを指定します。

urlpatterns = [

path('celery_test/', views.celery_test, name='celery_test'),

DBテーブルの作成

Celeryがバックエンドで利用するテーブルをデーターベースに追加する必要があります。Djangoサーバを起動する前にmigrateを実行しましょう。

$ python manage.py migrate

下記のように新しいテーブル『django_celery_results』が追加されました。

Operations to perform:

Apply all migrations: admin, auth, contenttypes, django_celery_beat, django_celery_results, sessions
Running migrations:
Applying django_celery_beat.0001_initial... OK
Applying django_celery_beat.0002_auto_20161118_0346... OK
Applying django_celery_beat.0003_auto_20161209_0049... OK
Applying django_celery_beat.0004_auto_20170221_0000... OK
Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
Applying django_celery_beat.0006_auto_20180322_0932... OK
Applying django_celery_beat.0007_auto_20180521_0826... OK
Applying django_celery_beat.0008_auto_20180914_1922... OK
Applying django_celery_beat.0006_auto_20180210_1226... OK
Applying django_celery_beat.0006_periodictask_priority... OK
Applying django_celery_beat.0009_periodictask_headers... OK
Applying django_celery_beat.0010_auto_20190429_0326... OK
Applying django_celery_beat.0011_auto_20190508_0153... OK
Applying django_celery_beat.0012_periodictask_expire_seconds... OK
Applying django_celery_results.0001_initial... OK
Applying django_celery_results.0002_add_task_name_args_kwargs... OK
Applying django_celery_results.0003_auto_20181106_1101... OK
Applying django_celery_results.0004_auto_20190516_0412... OK
Applying django_celery_results.0005_taskresult_worker... OK
Applying django_celery_results.0006_taskresult_date_created... OK
Applying django_celery_results.0007_remove_taskresult_hidden... OK

Celery テーブルの追加

ここまででプロジェクトの設定は完了です。

各種パッケージの起動

プロジェクト実行前に、必要となる各種パッケージを起動しておきましょう。

Redisサーバの起動

Redisサーバを起動します。Redisを起動するにはターミナルに下記のコマンドを入力してください。

$ redis-server

下記のように表示されていればRedisの起動に成功しています。

6540:C 04 May 2020 11:30:12.134 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo

6540:C 04 May 2020 11:30:12.134 # Redis version=6.0.1, bits=64, commit=00000000, modified=0, pid=6540, just started
6540:C 04 May 2020 11:30:12.134 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
6540:M 04 May 2020 11:30:12.136 * Increased maximum number of open files to 10032 (it was originally set to 256).
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 6.0.1 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in standalone mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379
| `-._ `._ / _.-' | PID: 6540
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'

6540:M 04 May 2020 11:30:12.138 # Server initialized
6540:M 04 May 2020 11:30:12.138 * Loading RDB produced by version 6.0.1
6540:M 04 May 2020 11:30:12.139 * RDB age 6911 seconds
6540:M 04 May 2020 11:30:12.139 * RDB memory usage when created 0.95 Mb
6540:M 04 May 2020 11:30:12.139 * DB loaded from disk: 0.000 seconds
6540:M 04 May 2020 11:30:12.139 * Ready to accept connections
  • 停止コマンド:Ctrl+C

celery beatの起動

celery beatの起動コマンドを実行するにはmanage.pyと同じ階層に移動して実行する必要があります。CDコマンドを利用して先にディレクトリを変更しておいてください。

$ cd /Users/xx/Documents/DJANGO_DIRECTRY 

起動コマンドは以下の通りです。

  • 『project』の部分はプロジェクト名を表しています。違うプロジェクト名でDjangoファイルを作成している場合は適宜変更してください。
$ DJANGO_SETTINGS_MODULE=project.settings celery -A project beat --scheduler django_celery_beat.schedulers:DatabaseScheduler --pidfile /tmp/celerybeat.pid

下記のように表示されていればcelery beatの起動に成功しています。

celery beat v4.4.2 (cliffs) is starting.

__ - ... __ - _
LocalTime -> 2020-05-04 02:32:41
Configuration ->
. broker -> redis://localhost:6379/1
. loader -> celery.loaders.app.AppLoader
. scheduler -> django_celery_beat.schedulers.DatabaseScheduler

. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 seconds (5s)
  • 停止コマンド:Ctrl+C

ワーカープロセスの起動

celery beatと同じで起動コマンドを実行するにはmanage.pyと同じ階層に移動して実行する必要があります。CDコマンドを利用して先にディレクトリを変更しておいてください。

$ cd /Users/xx/Documents/DJANGO_DIRECTRY 
  • 『project』の部分はプロジェクト名を表しています。違うプロジェクト名でDjangoファイルを作成している場合は適宜変更してください。
$ bash -c "DJANGO_SETTINGS_MODULE=project.settings celery -A project worker"

下記のように表示されていればcelery wokerの起動に成功しています。

 -------------- celery@MacBook-Pro.local v4.4.2 (cliffs)

--- ***** -----
-- ******* ---- macOS-10.15.4-x86_64-i386-64bit 2020-05-04 02:37:23
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: project:0x107750100
- ** ---------- .> transport: redis://localhost:6379/1
- ** ---------- .> results:
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
  • 停止コマンド:Ctrl+C

起動時のオプション設定

起動時にオプションコマンドを追加することで起動設定を変更することができます。

ログレベル:-l
  • DEBUG
  • INFO
  • WARNING
  • ERROR
  • CRITICAL
  • FATAL

いずれかのログレベルを選択できます。

transport

『CELERY_BROKER_URL』で指定したブローカーのURLです。コマンドラインで別のブローカーを指定する場合は下記のオプションコマンドを付与しましょう。

  • オプションコマンド:-b
concurrency

ワーカープロセスの数で、同時にタスク処理できる数が決まります。オプションで指定したワーカープロセスが全て処理中の場合、新しいタスクはキュー溜まり続けていきます。設定数を変更する場合は下記のオプションコマンドと設定数を付与しましょう。

  • オプションコマンド:-c

Note

・concurrency数はデフォルトでサーバのCPU数(コアを含む)が設定されます。

・CPU数の2倍以上の設定値を指定しても通常良い結果が得られません。

task events

有効になっていると、ワーカー内のタスク状況をモニタリングし監視メッセージ(イベント)を送信します。CeleryイベントやリアルタイムのCeleryモニタ『Flower』で利用できます。設定を切り替える場合は下記のオプションコマンドを付与しましょう。

  • オプションコマンド:-E
queues

ワーカーが処理するタスクキューのリストです。メッセージキューを特定のワーカーにルーティングし、ワーカーは一度に複数のキューから処理するように指示されます。キューを指定する場合には下記のオプションコマンドを付与しましょう。

  • オプション:-Q

Note

  • 本番環境ではデーモンとしてバックグラウンドで稼働させることをお勧めします。
  • ワーカープロセスやDjangoサーバ起動時にタスクモジュールが読み込まれるため、起動中にタスクモジュールの関数を新規作成・変更した場合はサーバを再起動(大半はワーカープロセスの再起動)しておきましょう。再起動せずに新しいタスクモジュールの関数を作成し利用した場合はタスクをキューに登録する時以下のようなエラーが表示されます。
  • Received unregistered task of type

非同期処理の実行

これで非同期処理用の全ての準備が完了しました。Djangoサーバを起動して実際に非同期処理を行ってみましょう。

非同期処理を実行するため指定したURLにアクセスします。

  • http://127.0.0.1:8000/testapp/celery_test/

以下のような画面が出力されましたらタスクがキューに追加され非同期処理がスタートしています。

Celery 非同期処理の結果表示

非同期処理が完了しましたらワーカープロセスのコマンドラインにprint関数の内容が出力されます。

Celery ワーカープロセスの出力画面

『処理完了』と出力されていますので非同期処理がうまく完了したことを示しています。

実行結果の確認

非同期処理の詳細な実行結果を確認します。

非同期処理の実行結果を管理画面から確認する

非同期処理の実行結果はDjangoのAdmin画面でも確認することができます。管理画面にアクセスしてみましょう。『Celery Results › Task results』が追加されているはずです。

Celery 管理画面

非同期処理の実行結果をデーターベースから取得する

DBから実行結果を取得したい場合はdjango_celery_resultsを利用します。django_celery_resultsを利用する場合、事前にsettings.pyへ下記1行を追加しておく必要があります。

CELERY_RESULT_BACKEND = 'django-db'

既に設定済みの場合は『TaskResult』オブジェクトからタスクIDを指定して実行処理の詳細な結果を取得しましょう。

# TaskResultをインポート

from django_celery_results.models import TaskResult
# TaskResultオブジェクトから実行結果を取得
result_object = TaskResult.objects.get(task_id=task_id)

今日のdot

MacOSのローカル環境にCeleryとRedisを利用して、Djangoで非同期処理を実装する方法に関する解説はこれで全て終了です。UX向上のため処理時間が長いものは積極的に非同期処理を採用するようにしましょう。