Getting *** Task instance did not exist in the DB as error when running gcs_to_bq in composer
up vote
0
down vote
favorite
While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB
under the gcs2bq
task Log in Airflow
Code:
import datetime
import os
import csv
import pandas as pd
import pip
from airflow import models
#from airflow.contrib.operators import dataproc_operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_operator
print('''/-------/--------/------/
-------/--------/------/''')
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
#models.Variable.get('gcp_project')
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_agg_quickstart',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
#op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs2bq',
bucket='dr-mockup-data',
source_objects=['sample.csv'],
destination_project_dataset_table='data-rubrics.sample_bqtable',
schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
write_disposition='WRITE_TRUNCATE',
dag=dag)
#op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
op_start >> op_load
airflow google-cloud-composer
add a comment |
up vote
0
down vote
favorite
While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB
under the gcs2bq
task Log in Airflow
Code:
import datetime
import os
import csv
import pandas as pd
import pip
from airflow import models
#from airflow.contrib.operators import dataproc_operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_operator
print('''/-------/--------/------/
-------/--------/------/''')
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
#models.Variable.get('gcp_project')
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_agg_quickstart',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
#op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs2bq',
bucket='dr-mockup-data',
source_objects=['sample.csv'],
destination_project_dataset_table='data-rubrics.sample_bqtable',
schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
write_disposition='WRITE_TRUNCATE',
dag=dag)
#op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
op_start >> op_load
airflow google-cloud-composer
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB
under the gcs2bq
task Log in Airflow
Code:
import datetime
import os
import csv
import pandas as pd
import pip
from airflow import models
#from airflow.contrib.operators import dataproc_operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_operator
print('''/-------/--------/------/
-------/--------/------/''')
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
#models.Variable.get('gcp_project')
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_agg_quickstart',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
#op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs2bq',
bucket='dr-mockup-data',
source_objects=['sample.csv'],
destination_project_dataset_table='data-rubrics.sample_bqtable',
schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
write_disposition='WRITE_TRUNCATE',
dag=dag)
#op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
op_start >> op_load
airflow google-cloud-composer
While executing the following python script using cloud-composer, I get *** Task instance did not exist in the DB
under the gcs2bq
task Log in Airflow
Code:
import datetime
import os
import csv
import pandas as pd
import pip
from airflow import models
#from airflow.contrib.operators import dataproc_operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_operator
print('''/-------/--------/------/
-------/--------/------/''')
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'data-rubrics'
#models.Variable.get('gcp_project')
}
try:
# [START composer_quickstart_schedule]
with models.DAG(
'composer_agg_quickstart',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]
op_start = BashOperator(task_id='Initializing', bash_command='echo Initialized')
#op_readwrite = PythonOperator(task_id = 'ReadAggWriteFile', python_callable=read_data)
op_load = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs2bq',
bucket='dr-mockup-data',
source_objects=['sample.csv'],
destination_project_dataset_table='data-rubrics.sample_bqtable',
schema_fields = [{'name':'a', 'type':'STRING', 'mode':'NULLABLE'},{'name':'b', 'type':'FLOAT', 'mode':'NULLABLE'}],
write_disposition='WRITE_TRUNCATE',
dag=dag)
#op_write = PythonOperator(task_id = 'AggregateAndWriteFile', python_callable=write_data)
op_start >> op_load
airflow google-cloud-composer
airflow google-cloud-composer
asked 2 days ago
Gaurav Taneja
677210
677210
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
0
down vote
UPDATE:
Can you remove dag=dag
from gcs2bq
task as you are already using with models.DAG
and run your dag again?
It might be because you have a dynamic start date. Your start_date
should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.
Make your start_date
static or use Airflow utils/macros:
import airflow
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
Understood, but theop_start
(shared the same configs and hence the same start date) task runs fine but forgcs2bq
there isn't any Task Instance generated at all
– Gaurav Taneja
2 days ago
I have updated the answer, Can you removedag=dag
fromgcs2bq
task as you are already using withmodels.DAG
and run your dag again?
– kaxil
2 days ago
Removeddag=dag
forgcs2bq
but nothing changed
– Gaurav Taneja
2 days ago
add a comment |
up vote
0
down vote
Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table
. Thanks and apologies to all who spent time.
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
UPDATE:
Can you remove dag=dag
from gcs2bq
task as you are already using with models.DAG
and run your dag again?
It might be because you have a dynamic start date. Your start_date
should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.
Make your start_date
static or use Airflow utils/macros:
import airflow
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
Understood, but theop_start
(shared the same configs and hence the same start date) task runs fine but forgcs2bq
there isn't any Task Instance generated at all
– Gaurav Taneja
2 days ago
I have updated the answer, Can you removedag=dag
fromgcs2bq
task as you are already using withmodels.DAG
and run your dag again?
– kaxil
2 days ago
Removeddag=dag
forgcs2bq
but nothing changed
– Gaurav Taneja
2 days ago
add a comment |
up vote
0
down vote
UPDATE:
Can you remove dag=dag
from gcs2bq
task as you are already using with models.DAG
and run your dag again?
It might be because you have a dynamic start date. Your start_date
should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.
Make your start_date
static or use Airflow utils/macros:
import airflow
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
Understood, but theop_start
(shared the same configs and hence the same start date) task runs fine but forgcs2bq
there isn't any Task Instance generated at all
– Gaurav Taneja
2 days ago
I have updated the answer, Can you removedag=dag
fromgcs2bq
task as you are already using withmodels.DAG
and run your dag again?
– kaxil
2 days ago
Removeddag=dag
forgcs2bq
but nothing changed
– Gaurav Taneja
2 days ago
add a comment |
up vote
0
down vote
up vote
0
down vote
UPDATE:
Can you remove dag=dag
from gcs2bq
task as you are already using with models.DAG
and run your dag again?
It might be because you have a dynamic start date. Your start_date
should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.
Make your start_date
static or use Airflow utils/macros:
import airflow
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
UPDATE:
Can you remove dag=dag
from gcs2bq
task as you are already using with models.DAG
and run your dag again?
It might be because you have a dynamic start date. Your start_date
should never be dynamic. Read this FAQ: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.
Make your start_date
static or use Airflow utils/macros:
import airflow
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
edited 2 days ago
answered 2 days ago
kaxil
1,868527
1,868527
Understood, but theop_start
(shared the same configs and hence the same start date) task runs fine but forgcs2bq
there isn't any Task Instance generated at all
– Gaurav Taneja
2 days ago
I have updated the answer, Can you removedag=dag
fromgcs2bq
task as you are already using withmodels.DAG
and run your dag again?
– kaxil
2 days ago
Removeddag=dag
forgcs2bq
but nothing changed
– Gaurav Taneja
2 days ago
add a comment |
Understood, but theop_start
(shared the same configs and hence the same start date) task runs fine but forgcs2bq
there isn't any Task Instance generated at all
– Gaurav Taneja
2 days ago
I have updated the answer, Can you removedag=dag
fromgcs2bq
task as you are already using withmodels.DAG
and run your dag again?
– kaxil
2 days ago
Removeddag=dag
forgcs2bq
but nothing changed
– Gaurav Taneja
2 days ago
Understood, but the
op_start
(shared the same configs and hence the same start date) task runs fine but for gcs2bq
there isn't any Task Instance generated at all– Gaurav Taneja
2 days ago
Understood, but the
op_start
(shared the same configs and hence the same start date) task runs fine but for gcs2bq
there isn't any Task Instance generated at all– Gaurav Taneja
2 days ago
I have updated the answer, Can you remove
dag=dag
from gcs2bq
task as you are already using with models.DAG
and run your dag again?– kaxil
2 days ago
I have updated the answer, Can you remove
dag=dag
from gcs2bq
task as you are already using with models.DAG
and run your dag again?– kaxil
2 days ago
Removed
dag=dag
for gcs2bq
but nothing changed– Gaurav Taneja
2 days ago
Removed
dag=dag
for gcs2bq
but nothing changed– Gaurav Taneja
2 days ago
add a comment |
up vote
0
down vote
Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table
. Thanks and apologies to all who spent time.
add a comment |
up vote
0
down vote
Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table
. Thanks and apologies to all who spent time.
add a comment |
up vote
0
down vote
up vote
0
down vote
Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table
. Thanks and apologies to all who spent time.
Okay, this was a stupid question on my part and apologies for everyone who wasted time here. I had a Dag running due to which the one I was shooting off was always in the que. Also, I did not write the correct value in destination_project_dataset_table
. Thanks and apologies to all who spent time.
answered 2 days ago
Gaurav Taneja
677210
677210
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53372966%2fgetting-task-instance-did-not-exist-in-the-db-as-error-when-running-gcs-to-b%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown