Getting *** Task instance did not exist in the DB as error when running gcs_to_bq in composer











up vote
0
down vote

favorite












While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB under the gcs2bq task Log in Airflow
Code:



import datetime
import os
import csv
import pandas as pd
import pip
from airflow import models
#from airflow.contrib.operators import dataproc_operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_operator

print('''/-------/--------/------/
-------/--------/------/''')
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
#models.Variable.get('gcp_project')
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_agg_quickstart',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
#op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs2bq',
bucket='dr-mockup-data',
source_objects=['sample.csv'],
destination_project_dataset_table='data-rubrics.sample_bqtable',
schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
write_disposition='WRITE_TRUNCATE',
dag=dag)
#op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
op_start >> op_load









share|improve this question


























    up vote
    0
    down vote

    favorite












    While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB under the gcs2bq task Log in Airflow
    Code:



    import datetime
    import os
    import csv
    import pandas as pd
    import pip
    from airflow import models
    #from airflow.contrib.operators import dataproc_operator
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils import trigger_rule
    from airflow.contrib.operators import gcs_to_bq
    from airflow.contrib.operators import bigquery_operator

    print('''/-------/--------/------/
    -------/--------/------/''')
    yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
    default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': 'data-rubrics'
    #models.Variable.get('gcp_project')
    }
    try:
    # [START composer_quickstart_schedule]
    with models.DAG(
    'composer_agg_quickstart',
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args) as dag:
    # [END composer_quickstart_schedule]
    op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
    #op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
    op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    task_id='gcs2bq',
    bucket='dr-mockup-data',
    source_objects=['sample.csv'],
    destination_project_dataset_table='data-rubrics.sample_bqtable',
    schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
    write_disposition='WRITE_TRUNCATE',
    dag=dag)
    #op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
    op_start >> op_load









    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB under the gcs2bq task Log in Airflow
      Code:



      import datetime
      import os
      import csv
      import pandas as pd
      import pip
      from airflow import models
      #from airflow.contrib.operators import dataproc_operator
      from airflow.operators.bash_operator import BashOperator
      from airflow.operators.python_operator import PythonOperator
      from airflow.utils import trigger_rule
      from airflow.contrib.operators import gcs_to_bq
      from airflow.contrib.operators import bigquery_operator

      print('''/-------/--------/------/
      -------/--------/------/''')
      yesterday = datetime.datetime.combine(
      datetime.datetime.today() - datetime.timedelta(1),
      datetime.datetime.min.time())
      default_dag_args = {
      # Setting start date as yesterday starts the DAG immediately when it is
      # detected in the Cloud Storage bucket.
      'start_date': yesterday,
      # To email on failure or retry set 'email' arg to your email and enable
      # emailing here.
      'email_on_failure': False,
      'email_on_retry': False,
      # If a task fails, retry it once after waiting at least 5 minutes
      'retries': 1,
      'retry_delay': datetime.timedelta(minutes=5),
      'project_id': 'data-rubrics'
      #models.Variable.get('gcp_project')
      }
      try:
      # [START composer_quickstart_schedule]
      with models.DAG(
      'composer_agg_quickstart',
      # Continue to run DAG once per day
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:
      # [END composer_quickstart_schedule]
      op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
      #op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
      op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
      task_id='gcs2bq',
      bucket='dr-mockup-data',
      source_objects=['sample.csv'],
      destination_project_dataset_table='data-rubrics.sample_bqtable',
      schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
      write_disposition='WRITE_TRUNCATE',
      dag=dag)
      #op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
      op_start >> op_load









      share|improve this question













      While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB under the gcs2bq task Log in Airflow
      Code:



      import datetime
      import os
      import csv
      import pandas as pd
      import pip
      from airflow import models
      #from airflow.contrib.operators import dataproc_operator
      from airflow.operators.bash_operator import BashOperator
      from airflow.operators.python_operator import PythonOperator
      from airflow.utils import trigger_rule
      from airflow.contrib.operators import gcs_to_bq
      from airflow.contrib.operators import bigquery_operator

      print('''/-------/--------/------/
      -------/--------/------/''')
      yesterday = datetime.datetime.combine(
      datetime.datetime.today() - datetime.timedelta(1),
      datetime.datetime.min.time())
      default_dag_args = {
      # Setting start date as yesterday starts the DAG immediately when it is
      # detected in the Cloud Storage bucket.
      'start_date': yesterday,
      # To email on failure or retry set 'email' arg to your email and enable
      # emailing here.
      'email_on_failure': False,
      'email_on_retry': False,
      # If a task fails, retry it once after waiting at least 5 minutes
      'retries': 1,
      'retry_delay': datetime.timedelta(minutes=5),
      'project_id': 'data-rubrics'
      #models.Variable.get('gcp_project')
      }
      try:
      # [START composer_quickstart_schedule]
      with models.DAG(
      'composer_agg_quickstart',
      # Continue to run DAG once per day
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:
      # [END composer_quickstart_schedule]
      op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
      #op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
      op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
      task_id='gcs2bq',
      bucket='dr-mockup-data',
      source_objects=['sample.csv'],
      destination_project_dataset_table='data-rubrics.sample_bqtable',
      schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
      write_disposition='WRITE_TRUNCATE',
      dag=dag)
      #op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
      op_start >> op_load






      airflow google-cloud-composer






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked 2 days ago









      Gaurav Taneja

      677210




      677210
























          2 Answers
          2






          active

          oldest

          votes

















          up vote
          0
          down vote













          UPDATE:



          Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?





          It might be because you have a dynamic start date. Your start_date should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date




          We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.




          Make your start_date static or use Airflow utils/macros:



          import airflow
          args = {
          'owner': 'airflow',
          'start_date': airflow.utils.dates.days_ago(2),
          }





          share|improve this answer























          • Understood, but the op_start (shared the same configs and hence the same start date) task runs fine but for gcs2bq there isn't any Task Instance generated at all
            – Gaurav Taneja
            2 days ago










          • I have updated the answer, Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?
            – kaxil
            2 days ago










          • Removed dag=dag for gcs2bq but nothing changed
            – Gaurav Taneja
            2 days ago


















          up vote
          0
          down vote













          Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table. Thanks and apologies to all who spent time.






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














             

            draft saved


            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53372966%2fgetting-task-instance-did-not-exist-in-the-db-as-error-when-running-gcs-to-b%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            0
            down vote













            UPDATE:



            Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?





            It might be because you have a dynamic start date. Your start_date should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date




            We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.




            Make your start_date static or use Airflow utils/macros:



            import airflow
            args = {
            'owner': 'airflow',
            'start_date': airflow.utils.dates.days_ago(2),
            }





            share|improve this answer























            • Understood, but the op_start (shared the same configs and hence the same start date) task runs fine but for gcs2bq there isn't any Task Instance generated at all
              – Gaurav Taneja
              2 days ago










            • I have updated the answer, Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?
              – kaxil
              2 days ago










            • Removed dag=dag for gcs2bq but nothing changed
              – Gaurav Taneja
              2 days ago















            up vote
            0
            down vote













            UPDATE:



            Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?





            It might be because you have a dynamic start date. Your start_date should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date




            We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.




            Make your start_date static or use Airflow utils/macros:



            import airflow
            args = {
            'owner': 'airflow',
            'start_date': airflow.utils.dates.days_ago(2),
            }





            share|improve this answer























            • Understood, but the op_start (shared the same configs and hence the same start date) task runs fine but for gcs2bq there isn't any Task Instance generated at all
              – Gaurav Taneja
              2 days ago










            • I have updated the answer, Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?
              – kaxil
              2 days ago










            • Removed dag=dag for gcs2bq but nothing changed
              – Gaurav Taneja
              2 days ago













            up vote
            0
            down vote










            up vote
            0
            down vote









            UPDATE:



            Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?





            It might be because you have a dynamic start date. Your start_date should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date




            We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.




            Make your start_date static or use Airflow utils/macros:



            import airflow
            args = {
            'owner': 'airflow',
            'start_date': airflow.utils.dates.days_ago(2),
            }





            share|improve this answer














            UPDATE:



            Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?





            It might be because you have a dynamic start date. Your start_date should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date




            We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.




            Make your start_date static or use Airflow utils/macros:



            import airflow
            args = {
            'owner': 'airflow',
            'start_date': airflow.utils.dates.days_ago(2),
            }






            share|improve this answer














            share|improve this answer



            share|improve this answer








            edited 2 days ago

























            answered 2 days ago









            kaxil

            1,868527




            1,868527












            • Understood, but the op_start (shared the same configs and hence the same start date) task runs fine but for gcs2bq there isn't any Task Instance generated at all
              – Gaurav Taneja
              2 days ago










            • I have updated the answer, Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?
              – kaxil
              2 days ago










            • Removed dag=dag for gcs2bq but nothing changed
              – Gaurav Taneja
              2 days ago


















            • Understood, but the op_start (shared the same configs and hence the same start date) task runs fine but for gcs2bq there isn't any Task Instance generated at all
              – Gaurav Taneja
              2 days ago










            • I have updated the answer, Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?
              – kaxil
              2 days ago










            • Removed dag=dag for gcs2bq but nothing changed
              – Gaurav Taneja
              2 days ago
















            Understood, but the op_start (shared the same configs and hence the same start date) task runs fine but for gcs2bq there isn't any Task Instance generated at all
            – Gaurav Taneja
            2 days ago




            Understood, but the op_start (shared the same configs and hence the same start date) task runs fine but for gcs2bq there isn't any Task Instance generated at all
            – Gaurav Taneja
            2 days ago












            I have updated the answer, Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?
            – kaxil
            2 days ago




            I have updated the answer, Can you remove dag=dag from gcs2bq task as you are already using with models.DAG and run your dag again?
            – kaxil
            2 days ago












            Removed dag=dag for gcs2bq but nothing changed
            – Gaurav Taneja
            2 days ago




            Removed dag=dag for gcs2bq but nothing changed
            – Gaurav Taneja
            2 days ago












            up vote
            0
            down vote













            Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table. Thanks and apologies to all who spent time.






            share|improve this answer

























              up vote
              0
              down vote













              Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table. Thanks and apologies to all who spent time.






              share|improve this answer























                up vote
                0
                down vote










                up vote
                0
                down vote









                Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table. Thanks and apologies to all who spent time.






                share|improve this answer












                Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table. Thanks and apologies to all who spent time.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered 2 days ago









                Gaurav Taneja

                677210




                677210






























                     

                    draft saved


                    draft discarded



















































                     


                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53372966%2fgetting-task-instance-did-not-exist-in-the-db-as-error-when-running-gcs-to-b%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Wiesbaden

                    To store a contact into the json file from server.js file using a class in NodeJS

                    Marschland