セロリキューには、知りたいこと、タスクの優先度、動的に指定されたキューがすべて含まれています。



Celery Queue Has Everything You Want Know



今日は、本番キューで一般的に使用されるキュー構成、タスク間に優先関係が存在するようにキューにタスクを設定する方法、およびタスクを送信するためのキューを動的に指定する方法を紹介します。

1.Celeryキュー構成の概要

CELERY_QUEUES = ( Queue('celery', Exchange('celery'), routing_key='celery') )

セロリは設定されたキュー名です
取引所は取引所の名前です
Routing_keyスイッチがキューと通信するために使用するキー
簡単に言うと、このプロセスでは、Celeryサーバーがrabbitmqで指定されたスイッチから対応するデータを取得し、このスイッチを使用して指定されたキューに送信します。

これはCeleryのデフォルトのキュー設定です。



次に、キューをもう1つ追加し、このキューに優先度属性を設定します。

# Priority queue settings CELERY_ACKS_LATE = True CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_QUEUES = ( Queue('celery', Exchange('celery'), routing_key='celery'), Queue('celery_demo', Exchange('celery_demo'), routing_key='celery_demo', queue_arguments={'x-max-priority': 9}) )

キュー構成にはさらに2つの属性があり、キュー優先度値queue_argumentsが最大9に設定されていることがわかります。
2つの属性CELERY_ACKS_LATEおよびCELERYD_PREFETCH_MULTIPLIERを構成に追加する必要があることに注意してください。そうしないと、priority属性が有効になりません。



2.タスクキューを動的に指定し、優先度を設定します

タスクを送信するとき、apply_asyncは、キューを優先キューcelery_demoとして指定するように設定し、優先度を5に設定します。

from celery_test.task_register import * from celery import group from celery import chord sig = add.s(1, 1) sig.apply_async(queue='celery_demo',priority=5)

3.タスクを実行し、結果を比較します

比較をより便利にするために、タスクを登録し、10秒間スリープしてから、追加します。

@app.task def add_time(x, y): time.sleep(10) return x + y

1.サーバー監視タスクを開始します

画像
このサーバーは、デフォルトのセロリキューとpriority属性を持つcelery_demoの2つのキューを監視していることがわかります。 RabbitMQからも確認できます。 priがある場合は、優先順位が設定されます。レベルキュー。
画像



2.タスクを実行します

優先順位が有効かどうかの比較を容易にするために、ここでは次の3つのタスクを設定しました。

sig = add_time.s(1, 1) sig.apply_async(queue='celery') sig = add_time.s(2, 2) sig.apply_async(queue='celery') sig = add_time.s(3, 3) sig.apply_async(queue='celery')

最初に実行結果を確認すると、デフォルトのキューになります
画像
これらの3つのタスクは順番に実行され、最初に送信されたタスクが最初に実行されることがわかります。これも優先順位がない時間と一致しています。
次に、これら3つのタスクを優先キューcelery_demoに入れます。
3つのタスクの優先度パラメータを設定します

sig = add_time.s(1, 1) sig.apply_async(queue='celery_demo',priority=5) sig = add_time.s(2, 2) sig.apply_async(queue='celery_demo',priority=7) sig = add_time.s(3, 3) sig.apply_async(queue='celery_demo',priority=8)

画像
タスク1が最初に実行され、次にタスク3が実行され、最後にタスク2が実行されることがわかります。
なぜこうなった?優先度が最も高いタスク3を最初に実行するべきではありませんか?
ここでは、キューが現在空の場合、つまりキューが非ブロッキングの場合、もちろんどのタスクが最初に送信され、どのタスクが最初に実行されるかという問題を考慮する必要があります。
次に、タスク1は実行中に10秒間スリープします。これは、キューが10秒間ブロックされることを意味します。この時点で、キューにはまだタスク2と3があります。
この時点で、タスク2。タスク3がソートされようとしています。並べ替えの方法は、優先度で設定された値に基づいており、どちらか大きい方が最初に実行されます。
この並べ替えは、優先度キューが設定されている場合にのみ実行されます。それ以外の場合は、タスク送信の順序で実行されます。

一部の人々はあなたが優先度キューに提出するかどうか尋ねるかもしれませんが、優先度を設定しません、それはデフォルトで最高または最低レベルになりますか?
OK、結果を見てみましょう

sig = add_time.s(1, 1) sig.apply_async(queue='celery_demo',priority=5) sig = add_time.s(2, 2) sig.apply_async(queue='celery_demo') sig = add_time.s(3, 3) sig.apply_async(queue='celery_demo',priority=1)

画像
その結果から、キューは遅くともデフォルトで優先度を設定せずに実行され、順番に実行されます。

Celeryキューのいくつかの基本設定、優先順位設定、およびキューの動的指定が最初にここにあります。ご不明な点がございましたら、コメントしてお知らせください。

私は動くアリで、一緒に前進したいと思っています。

私が少しでもあなたを助けることができれば、あなたのサンリアンは私の創造の最大の動機です、ありがとう!

次の記事では、Celeryが並列タスク、ワークフロータスク、およびワークチェーンタスクを作成する方法について引き続き説明します。

注:このブログに間違いや提案がある場合は、それらを指摘していただければ幸いです。 ! !