How to fix airflow error: print_context() missing 1 required positional argument: 'ds'





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







0















I have a dag as below:
ingest_excel.py:



from __future__ import print_function

import time
from builtins import range
from datetime import timedelta
from pprint import pprint

import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

args = {
'owner': 'rxie',
'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
dag_id='ingest_excel',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)

def print_context(**kwargs):
pprint("DAG info below:")
pprint(kwargs)
return 'Whatever you return gets printed in the logs'


t11_extract_excel_to_csv = PythonOperator(
task_id='t1_extract_excel_to_csv',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)


t12_upload_csv_to_hdfs_parquet = PythonOperator(
task_id='t12_upload_csv_to_hdfs_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)


t13_register_parquet_to_impala = PythonOperator(
task_id='t13_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)

t21_text_to_parquet = PythonOperator(
task_id='t21_text_to_parquet',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)

t22_register_parquet_to_impala = PythonOperator(
task_id='t22_register_parquet_to_impala',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)

t31_verify_completion = PythonOperator(
task_id='t31_verify_completion',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)

t32_send_notification = PythonOperator(
task_id='t32_send_notification',
provide_context=True,
python_callable=print_context(),
op_kwargs=None,
dag=dag,
)

t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala

t21_text_to_parquet >> t22_register_parquet_to_impala


t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion

t31_verify_completion >> t32_send_notification


#if __name__ == "__main__":
# dag.cli()


In DAG GUI it prompts:




Broken DAG: [/root/airflow/dags/ingest_excel.py] python_callable
param must be callable




This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.



Thank you in advance.










share|improve this question































    0















    I have a dag as below:
    ingest_excel.py:



    from __future__ import print_function

    import time
    from builtins import range
    from datetime import timedelta
    from pprint import pprint

    import airflow
    from airflow.models import DAG
    #from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator

    args = {
    'owner': 'rxie',
    'start_date': airflow.utils.dates.days_ago(2),
    }

    dag = DAG(
    dag_id='ingest_excel',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    )

    def print_context(**kwargs):
    pprint("DAG info below:")
    pprint(kwargs)
    return 'Whatever you return gets printed in the logs'


    t11_extract_excel_to_csv = PythonOperator(
    task_id='t1_extract_excel_to_csv',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
    )


    t12_upload_csv_to_hdfs_parquet = PythonOperator(
    task_id='t12_upload_csv_to_hdfs_parquet',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
    )


    t13_register_parquet_to_impala = PythonOperator(
    task_id='t13_register_parquet_to_impala',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
    )

    t21_text_to_parquet = PythonOperator(
    task_id='t21_text_to_parquet',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
    )

    t22_register_parquet_to_impala = PythonOperator(
    task_id='t22_register_parquet_to_impala',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
    )

    t31_verify_completion = PythonOperator(
    task_id='t31_verify_completion',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
    )

    t32_send_notification = PythonOperator(
    task_id='t32_send_notification',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
    )

    t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
    t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala

    t21_text_to_parquet >> t22_register_parquet_to_impala


    t13_register_parquet_to_impala >> t31_verify_completion
    t22_register_parquet_to_impala >> t31_verify_completion

    t31_verify_completion >> t32_send_notification


    #if __name__ == "__main__":
    # dag.cli()


    In DAG GUI it prompts:




    Broken DAG: [/root/airflow/dags/ingest_excel.py] python_callable
    param must be callable




    This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.



    Thank you in advance.










    share|improve this question



























      0












      0








      0








      I have a dag as below:
      ingest_excel.py:



      from __future__ import print_function

      import time
      from builtins import range
      from datetime import timedelta
      from pprint import pprint

      import airflow
      from airflow.models import DAG
      #from airflow.operators.bash_operator import BashOperator
      from airflow.operators.python_operator import PythonOperator

      args = {
      'owner': 'rxie',
      'start_date': airflow.utils.dates.days_ago(2),
      }

      dag = DAG(
      dag_id='ingest_excel',
      default_args=args,
      schedule_interval='0 0 * * *',
      dagrun_timeout=timedelta(minutes=60),
      )

      def print_context(**kwargs):
      pprint("DAG info below:")
      pprint(kwargs)
      return 'Whatever you return gets printed in the logs'


      t11_extract_excel_to_csv = PythonOperator(
      task_id='t1_extract_excel_to_csv',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )


      t12_upload_csv_to_hdfs_parquet = PythonOperator(
      task_id='t12_upload_csv_to_hdfs_parquet',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )


      t13_register_parquet_to_impala = PythonOperator(
      task_id='t13_register_parquet_to_impala',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t21_text_to_parquet = PythonOperator(
      task_id='t21_text_to_parquet',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t22_register_parquet_to_impala = PythonOperator(
      task_id='t22_register_parquet_to_impala',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t31_verify_completion = PythonOperator(
      task_id='t31_verify_completion',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t32_send_notification = PythonOperator(
      task_id='t32_send_notification',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
      t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala

      t21_text_to_parquet >> t22_register_parquet_to_impala


      t13_register_parquet_to_impala >> t31_verify_completion
      t22_register_parquet_to_impala >> t31_verify_completion

      t31_verify_completion >> t32_send_notification


      #if __name__ == "__main__":
      # dag.cli()


      In DAG GUI it prompts:




      Broken DAG: [/root/airflow/dags/ingest_excel.py] python_callable
      param must be callable




      This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.



      Thank you in advance.










      share|improve this question
















      I have a dag as below:
      ingest_excel.py:



      from __future__ import print_function

      import time
      from builtins import range
      from datetime import timedelta
      from pprint import pprint

      import airflow
      from airflow.models import DAG
      #from airflow.operators.bash_operator import BashOperator
      from airflow.operators.python_operator import PythonOperator

      args = {
      'owner': 'rxie',
      'start_date': airflow.utils.dates.days_ago(2),
      }

      dag = DAG(
      dag_id='ingest_excel',
      default_args=args,
      schedule_interval='0 0 * * *',
      dagrun_timeout=timedelta(minutes=60),
      )

      def print_context(**kwargs):
      pprint("DAG info below:")
      pprint(kwargs)
      return 'Whatever you return gets printed in the logs'


      t11_extract_excel_to_csv = PythonOperator(
      task_id='t1_extract_excel_to_csv',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )


      t12_upload_csv_to_hdfs_parquet = PythonOperator(
      task_id='t12_upload_csv_to_hdfs_parquet',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )


      t13_register_parquet_to_impala = PythonOperator(
      task_id='t13_register_parquet_to_impala',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t21_text_to_parquet = PythonOperator(
      task_id='t21_text_to_parquet',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t22_register_parquet_to_impala = PythonOperator(
      task_id='t22_register_parquet_to_impala',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t31_verify_completion = PythonOperator(
      task_id='t31_verify_completion',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t32_send_notification = PythonOperator(
      task_id='t32_send_notification',
      provide_context=True,
      python_callable=print_context(),
      op_kwargs=None,
      dag=dag,
      )

      t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
      t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala

      t21_text_to_parquet >> t22_register_parquet_to_impala


      t13_register_parquet_to_impala >> t31_verify_completion
      t22_register_parquet_to_impala >> t31_verify_completion

      t31_verify_completion >> t32_send_notification


      #if __name__ == "__main__":
      # dag.cli()


      In DAG GUI it prompts:




      Broken DAG: [/root/airflow/dags/ingest_excel.py] python_callable
      param must be callable




      This is my first dag in Airflow, and I am pretty new to Airflow, it would be greatly appreciated if anyone can shed me some light and sort it out for me.



      Thank you in advance.







      airflow directed-acyclic-graphs






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 26 '18 at 17:06







      mdivk

















      asked Nov 26 '18 at 16:39









      mdivkmdivk

      80031328




      80031328
























          2 Answers
          2






          active

          oldest

          votes


















          0














          To elaborate on your issue: your process is broken because you're not passing the function print_context to the PythonOperator, you're passing the result of calling print_context:



          [...]

          t32_send_notification = PythonOperator(
          task_id='t32_send_notification',
          provide_context=True,
          python_callable=print_context(), # <-- This is the issue.
          op_kwargs=None,
          dag=dag,
          )

          [...]


          Your function is returning the string 'Whatever you return gets printed in the logs' which is, in turn, being provided to the PythonOperator in the python_callable keyword argument. Airflow is essentially attempting to do the following:



          your_return = 'Whatever you return gets printed in the logs'
          your_return()


          ...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable keyword argument to simply print_context






          share|improve this answer
























          • Thank you for your elaboration, joeb

            – mdivk
            Nov 26 '18 at 18:57



















          2














          I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.



          def print_context(**kwargs):
          ds = kwargs['ds']


          also the python_callable should be passed like this



          python_callable=print_context,





          share|improve this answer
























          • Thank you very much. removing the () resolved the issue.

            – mdivk
            Nov 26 '18 at 17:24












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485479%2fhow-to-fix-airflow-error-print-context-missing-1-required-positional-argument%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          2 Answers
          2






          active

          oldest

          votes








          2 Answers
          2






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          To elaborate on your issue: your process is broken because you're not passing the function print_context to the PythonOperator, you're passing the result of calling print_context:



          [...]

          t32_send_notification = PythonOperator(
          task_id='t32_send_notification',
          provide_context=True,
          python_callable=print_context(), # <-- This is the issue.
          op_kwargs=None,
          dag=dag,
          )

          [...]


          Your function is returning the string 'Whatever you return gets printed in the logs' which is, in turn, being provided to the PythonOperator in the python_callable keyword argument. Airflow is essentially attempting to do the following:



          your_return = 'Whatever you return gets printed in the logs'
          your_return()


          ...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable keyword argument to simply print_context






          share|improve this answer
























          • Thank you for your elaboration, joeb

            – mdivk
            Nov 26 '18 at 18:57
















          0














          To elaborate on your issue: your process is broken because you're not passing the function print_context to the PythonOperator, you're passing the result of calling print_context:



          [...]

          t32_send_notification = PythonOperator(
          task_id='t32_send_notification',
          provide_context=True,
          python_callable=print_context(), # <-- This is the issue.
          op_kwargs=None,
          dag=dag,
          )

          [...]


          Your function is returning the string 'Whatever you return gets printed in the logs' which is, in turn, being provided to the PythonOperator in the python_callable keyword argument. Airflow is essentially attempting to do the following:



          your_return = 'Whatever you return gets printed in the logs'
          your_return()


          ...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable keyword argument to simply print_context






          share|improve this answer
























          • Thank you for your elaboration, joeb

            – mdivk
            Nov 26 '18 at 18:57














          0












          0








          0







          To elaborate on your issue: your process is broken because you're not passing the function print_context to the PythonOperator, you're passing the result of calling print_context:



          [...]

          t32_send_notification = PythonOperator(
          task_id='t32_send_notification',
          provide_context=True,
          python_callable=print_context(), # <-- This is the issue.
          op_kwargs=None,
          dag=dag,
          )

          [...]


          Your function is returning the string 'Whatever you return gets printed in the logs' which is, in turn, being provided to the PythonOperator in the python_callable keyword argument. Airflow is essentially attempting to do the following:



          your_return = 'Whatever you return gets printed in the logs'
          your_return()


          ...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable keyword argument to simply print_context






          share|improve this answer













          To elaborate on your issue: your process is broken because you're not passing the function print_context to the PythonOperator, you're passing the result of calling print_context:



          [...]

          t32_send_notification = PythonOperator(
          task_id='t32_send_notification',
          provide_context=True,
          python_callable=print_context(), # <-- This is the issue.
          op_kwargs=None,
          dag=dag,
          )

          [...]


          Your function is returning the string 'Whatever you return gets printed in the logs' which is, in turn, being provided to the PythonOperator in the python_callable keyword argument. Airflow is essentially attempting to do the following:



          your_return = 'Whatever you return gets printed in the logs'
          your_return()


          ...and you're receiving the error you see. The other contributor is correct in stating that you should change your PythonOperator.python_callable keyword argument to simply print_context







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 26 '18 at 18:19









          joebjoeb

          2,28411519




          2,28411519













          • Thank you for your elaboration, joeb

            – mdivk
            Nov 26 '18 at 18:57



















          • Thank you for your elaboration, joeb

            – mdivk
            Nov 26 '18 at 18:57

















          Thank you for your elaboration, joeb

          – mdivk
          Nov 26 '18 at 18:57





          Thank you for your elaboration, joeb

          – mdivk
          Nov 26 '18 at 18:57













          2














          I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.



          def print_context(**kwargs):
          ds = kwargs['ds']


          also the python_callable should be passed like this



          python_callable=print_context,





          share|improve this answer
























          • Thank you very much. removing the () resolved the issue.

            – mdivk
            Nov 26 '18 at 17:24
















          2














          I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.



          def print_context(**kwargs):
          ds = kwargs['ds']


          also the python_callable should be passed like this



          python_callable=print_context,





          share|improve this answer
























          • Thank you very much. removing the () resolved the issue.

            – mdivk
            Nov 26 '18 at 17:24














          2












          2








          2







          I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.



          def print_context(**kwargs):
          ds = kwargs['ds']


          also the python_callable should be passed like this



          python_callable=print_context,





          share|improve this answer













          I'm not entirely sure why you're code doesn't work. It should work, but a work around is given below.



          def print_context(**kwargs):
          ds = kwargs['ds']


          also the python_callable should be passed like this



          python_callable=print_context,






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 26 '18 at 17:14









          rmn36rmn36

          311




          311













          • Thank you very much. removing the () resolved the issue.

            – mdivk
            Nov 26 '18 at 17:24



















          • Thank you very much. removing the () resolved the issue.

            – mdivk
            Nov 26 '18 at 17:24

















          Thank you very much. removing the () resolved the issue.

          – mdivk
          Nov 26 '18 at 17:24





          Thank you very much. removing the () resolved the issue.

          – mdivk
          Nov 26 '18 at 17:24


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485479%2fhow-to-fix-airflow-error-print-context-missing-1-required-positional-argument%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Wiesbaden

          Marschland

          Dieringhausen