-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TMP TEST 2 #8964
base: main
Are you sure you want to change the base?
TMP TEST 2 #8964
Conversation
def create(self, w): | ||
if w.hub2 is None: | ||
required_hub = getattr(w._conninfo, 'requires_hub', None) | ||
w.hub2 = set_event_loop(( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn’t use the set_event_loop
because it uses the global. It needs to create a new instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to create a hub per consumer (e.g. from w.consumers
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integrate logic into Hub class:
- If
w.consumer
is NOT none, go as usual. - if
w.consumers
is NOT none, apply same logic for each consumer (to get its own hub (loop) instance).
@@ -137,7 +178,8 @@ def create(self, w): | |||
max_restarts = None | |||
if w.app.conf.worker_pool in GREEN_POOLS: # pragma: no cover | |||
warnings.warn(UserWarning(W_POOL_SETTING)) | |||
threaded = not w.use_eventloop or IS_WINDOWS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not w.use_eventloop or IS_WINDOWS or app.new_flag is not None
@@ -215,14 +257,16 @@ def create(self, w): | |||
class Consumer(bootsteps.StartStopStep): | |||
"""Bootstep starting the Consumer blueprint.""" | |||
|
|||
requires = ('celery.worker.components:Hub2',) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not be needed then
last = True | ||
|
||
def create(self, w): | ||
if w.max_concurrency: | ||
prefetch_count = max(w.max_concurrency, 1) * w.prefetch_multiplier | ||
else: | ||
prefetch_count = w.concurrency * w.prefetch_multiplier | ||
c = w.consumer = self.instantiate( | ||
c = w.consumer = [self.instantiate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If new flag True:
w.consumer
= Nonew.consumers
= List
Else:
w.consumer
= Normal usagew.consumers
= None
@@ -181,7 +181,8 @@ def __init__(self, on_task_request, | |||
pool=None, app=None, | |||
timer=None, controller=None, hub=None, amqheartbeat=None, | |||
worker_options=None, disable_rate_limits=False, | |||
initial_prefetch_count=2, prefetch_multiplier=1, **kwargs): | |||
initial_prefetch_count=2, prefetch_multiplier=1, url=None, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add new parameter to allow passing the url directly when instantiating a new Consumer but use None for the default behavior
@@ -158,7 +158,11 @@ def on_close(self): | |||
|
|||
def on_stopped(self): | |||
self.timer.stop() | |||
self.consumer.shutdown() | |||
if isinstance(self.consumer, list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adjusting the w.consumer
to be a list, use the new flag to choose which consumer to act upon
!!! DO NOT MERGE !!!
Testing if something works in the CI or not due to using Mac as a local dev machine where it does work.