How to fix airflow error: print_context() missing 1 required positional argument: 'ds'
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
I have a dag as below:
ingest_excel.py:
from __future__ import print_function
import time
from builtins import range
from datetime import timedelta
from pprint import pprint
import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'rxie',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='ingest_excel',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
def print_context(**kwargs):
pprint("DAG info below:")
pprint(kwargs)
return 'Whatever you return gets printed in the logs'
t11_extract_excel_to_csv = PythonOperator(
task_id='t1_extract_excel_to_csv',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t12_upload_csv_to_hdfs_parquet = PythonOperator(
task_id='t12_upload_csv_to_hdfs_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t13_register_parquet_to_impala = PythonOperator(
task_id='t13_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t21_text_to_parquet = PythonOperator(
task_id='t21_text_to_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t22_register_parquet_to_impala = PythonOperator(
task_id='t22_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t31_verify_completion = PythonOperator(
task_id='t31_verify_completion',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala
t21_text_to_parquet >> t22_register_parquet_to_impala
t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion
t31_verify_completion >> t32_send_notification
#if __name__ == "__main__":
# dag.cli()
In DAG GUI it prompts:
Broken DAG: [/root/airflow/dags/ingest_excel.py]
python_callable
param must be callable
This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.
Thank you in advance.
airflow directed-acyclic-graphs
add a comment |
I have a dag as below:
ingest_excel.py:
from __future__ import print_function
import time
from builtins import range
from datetime import timedelta
from pprint import pprint
import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'rxie',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='ingest_excel',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
def print_context(**kwargs):
pprint("DAG info below:")
pprint(kwargs)
return 'Whatever you return gets printed in the logs'
t11_extract_excel_to_csv = PythonOperator(
task_id='t1_extract_excel_to_csv',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t12_upload_csv_to_hdfs_parquet = PythonOperator(
task_id='t12_upload_csv_to_hdfs_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t13_register_parquet_to_impala = PythonOperator(
task_id='t13_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t21_text_to_parquet = PythonOperator(
task_id='t21_text_to_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t22_register_parquet_to_impala = PythonOperator(
task_id='t22_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t31_verify_completion = PythonOperator(
task_id='t31_verify_completion',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala
t21_text_to_parquet >> t22_register_parquet_to_impala
t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion
t31_verify_completion >> t32_send_notification
#if __name__ == "__main__":
# dag.cli()
In DAG GUI it prompts:
Broken DAG: [/root/airflow/dags/ingest_excel.py]
python_callable
param must be callable
This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.
Thank you in advance.
airflow directed-acyclic-graphs
add a comment |
I have a dag as below:
ingest_excel.py:
from __future__ import print_function
import time
from builtins import range
from datetime import timedelta
from pprint import pprint
import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'rxie',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='ingest_excel',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
def print_context(**kwargs):
pprint("DAG info below:")
pprint(kwargs)
return 'Whatever you return gets printed in the logs'
t11_extract_excel_to_csv = PythonOperator(
task_id='t1_extract_excel_to_csv',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t12_upload_csv_to_hdfs_parquet = PythonOperator(
task_id='t12_upload_csv_to_hdfs_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t13_register_parquet_to_impala = PythonOperator(
task_id='t13_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t21_text_to_parquet = PythonOperator(
task_id='t21_text_to_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t22_register_parquet_to_impala = PythonOperator(
task_id='t22_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t31_verify_completion = PythonOperator(
task_id='t31_verify_completion',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala
t21_text_to_parquet >> t22_register_parquet_to_impala
t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion
t31_verify_completion >> t32_send_notification
#if __name__ == "__main__":
# dag.cli()
In DAG GUI it prompts:
Broken DAG: [/root/airflow/dags/ingest_excel.py]
python_callable
param must be callable
This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.
Thank you in advance.
airflow directed-acyclic-graphs
I have a dag as below:
ingest_excel.py:
from __future__ import print_function
import time
from builtins import range
from datetime import timedelta
from pprint import pprint
import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'rxie',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='ingest_excel',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
def print_context(**kwargs):
pprint("DAG info below:")
pprint(kwargs)
return 'Whatever you return gets printed in the logs'
t11_extract_excel_to_csv = PythonOperator(
task_id='t1_extract_excel_to_csv',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t12_upload_csv_to_hdfs_parquet = PythonOperator(
task_id='t12_upload_csv_to_hdfs_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t13_register_parquet_to_impala = PythonOperator(
task_id='t13_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t21_text_to_parquet = PythonOperator(
task_id='t21_text_to_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t22_register_parquet_to_impala = PythonOperator(
task_id='t22_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t31_verify_completion = PythonOperator(
task_id='t31_verify_completion',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)
t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala
t21_text_to_parquet >> t22_register_parquet_to_impala
t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion
t31_verify_completion >> t32_send_notification
#if __name__ == "__main__":
# dag.cli()
In DAG GUI it prompts:
Broken DAG: [/root/airflow/dags/ingest_excel.py]
python_callable
param must be callable
This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.
Thank you in advance.
airflow directed-acyclic-graphs
airflow directed-acyclic-graphs
edited Nov 26 '18 at 17:06
mdivk
asked Nov 26 '18 at 16:39
mdivkmdivk
80031328
80031328
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
To elaborate on your issue: your process is broken because you're not passing the function print_context
to the PythonOperator
, you're passing the result of calling print_context
:
[...]
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(), # <-- This is the issue.
op_kwargs=None,
dag=dag,
)
[...]
Your function is returning the string 'Whatever you return gets printed in the logs'
which is, in turn, being provided to the PythonOperator
in the python_callable
keyword argument. Airflow is essentially attempting to do the following:
your_return = 'Whatever you return gets printed in the logs'
your_return()
...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable
keyword argument to simply print_context
Thank you for your elaboration, joeb
– mdivk
Nov 26 '18 at 18:57
add a comment |
I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.
def print_context(**kwargs):
ds = kwargs['ds']
also the python_callable should be passed like this
python_callable=print_context,
Thank you very much. removing the () resolved the issue.
– mdivk
Nov 26 '18 at 17:24
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485479%2fhow-to-fix-airflow-error-print-context-missing-1-required-positional-argument%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
To elaborate on your issue: your process is broken because you're not passing the function print_context
to the PythonOperator
, you're passing the result of calling print_context
:
[...]
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(), # <-- This is the issue.
op_kwargs=None,
dag=dag,
)
[...]
Your function is returning the string 'Whatever you return gets printed in the logs'
which is, in turn, being provided to the PythonOperator
in the python_callable
keyword argument. Airflow is essentially attempting to do the following:
your_return = 'Whatever you return gets printed in the logs'
your_return()
...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable
keyword argument to simply print_context
Thank you for your elaboration, joeb
– mdivk
Nov 26 '18 at 18:57
add a comment |
To elaborate on your issue: your process is broken because you're not passing the function print_context
to the PythonOperator
, you're passing the result of calling print_context
:
[...]
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(), # <-- This is the issue.
op_kwargs=None,
dag=dag,
)
[...]
Your function is returning the string 'Whatever you return gets printed in the logs'
which is, in turn, being provided to the PythonOperator
in the python_callable
keyword argument. Airflow is essentially attempting to do the following:
your_return = 'Whatever you return gets printed in the logs'
your_return()
...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable
keyword argument to simply print_context
Thank you for your elaboration, joeb
– mdivk
Nov 26 '18 at 18:57
add a comment |
To elaborate on your issue: your process is broken because you're not passing the function print_context
to the PythonOperator
, you're passing the result of calling print_context
:
[...]
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(), # <-- This is the issue.
op_kwargs=None,
dag=dag,
)
[...]
Your function is returning the string 'Whatever you return gets printed in the logs'
which is, in turn, being provided to the PythonOperator
in the python_callable
keyword argument. Airflow is essentially attempting to do the following:
your_return = 'Whatever you return gets printed in the logs'
your_return()
...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable
keyword argument to simply print_context
To elaborate on your issue: your process is broken because you're not passing the function print_context
to the PythonOperator
, you're passing the result of calling print_context
:
[...]
t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(), # <-- This is the issue.
op_kwargs=None,
dag=dag,
)
[...]
Your function is returning the string 'Whatever you return gets printed in the logs'
which is, in turn, being provided to the PythonOperator
in the python_callable
keyword argument. Airflow is essentially attempting to do the following:
your_return = 'Whatever you return gets printed in the logs'
your_return()
...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable
keyword argument to simply print_context
answered Nov 26 '18 at 18:19
joebjoeb
2,28411519
2,28411519
Thank you for your elaboration, joeb
– mdivk
Nov 26 '18 at 18:57
add a comment |
Thank you for your elaboration, joeb
– mdivk
Nov 26 '18 at 18:57
Thank you for your elaboration, joeb
– mdivk
Nov 26 '18 at 18:57
Thank you for your elaboration, joeb
– mdivk
Nov 26 '18 at 18:57
add a comment |
I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.
def print_context(**kwargs):
ds = kwargs['ds']
also the python_callable should be passed like this
python_callable=print_context,
Thank you very much. removing the () resolved the issue.
– mdivk
Nov 26 '18 at 17:24
add a comment |
I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.
def print_context(**kwargs):
ds = kwargs['ds']
also the python_callable should be passed like this
python_callable=print_context,
Thank you very much. removing the () resolved the issue.
– mdivk
Nov 26 '18 at 17:24
add a comment |
I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.
def print_context(**kwargs):
ds = kwargs['ds']
also the python_callable should be passed like this
python_callable=print_context,
I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.
def print_context(**kwargs):
ds = kwargs['ds']
also the python_callable should be passed like this
python_callable=print_context,
answered Nov 26 '18 at 17:14
rmn36rmn36
311
311
Thank you very much. removing the () resolved the issue.
– mdivk
Nov 26 '18 at 17:24
add a comment |
Thank you very much. removing the () resolved the issue.
– mdivk
Nov 26 '18 at 17:24
Thank you very much. removing the () resolved the issue.
– mdivk
Nov 26 '18 at 17:24
Thank you very much. removing the () resolved the issue.
– mdivk
Nov 26 '18 at 17:24
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485479%2fhow-to-fix-airflow-error-print-context-missing-1-required-positional-argument%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown