Error while using WriteToBigquery in python for Dataflow pipeline. Unicode object has no attribute 'items'
My sample data is in json format and looks like:
{
"metadata": {
"action": "insert",
"type": "export",
"version": 1,
"timestamp": "2018-11-23T09:17:59.048-08:00"
},
"data": {
"attr1": 61,
"day": "2018-11-22",
"pin": "2C49956",
"CDP": 0,
"DP": 0,
"VD": 0,
"seo": 0,
"dir": 0,
"other": 0,
"at": 0
}
}
This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.
The code looks as below:
import collections
import json
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
WorkerOptions
from apache_beam.io.gcp import bigquery
# Creating options object
def create_options(argv):
# pipeline options
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'something'
google_cloud_options.job_name = datetime.now().strftime('somename')
google_cloud_options.staging_location = 'some_loc'
google_cloud_options.temp_location = 'another_loc'
options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
return options
class PrepareData(beam.DoFn):
"""
ParDo function to create a dictionary of data for downstream consumption
"""
def process(self, element):
data = json.loads(element)
modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
return [modified_data]
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
# for dict in element["data"]["data"]:
# dict["timestamp"] = element["timestamp"]
return element["data"]["data"]
def run_pipe(options, argv):
"""
Creating pipelines
"""
p = beam.Pipeline(options=options)
main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())
""" Separating pipes for various actions """
insert_pipe= main_pipe | beam.ParDo(FilterInserts())
"""
Inserts--> sinking to BQ
"""
insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
project='some-data-warehouse',
dataset='sample_data',
table='sample',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED')
p.run()
def main():
"""
Main function to drive the run
:return: errors if any
"""
parser = argparse.ArgumentParser()
args = parser.parse_args()
try:
# create options
opt = create_options(argv=args)
# run pipeline
run_pipe(opt, argv=args)
except Exception as e:
logging.error('Pipeline failed with error : %s', e)
raise Exception('Pipeline failed with error : %s', e)
if __name__ == "__main__":
main()
I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
The error message is:
Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))
Can anyone help me figure out what's going wrong and how i can fix this?
python json google-bigquery google-cloud-dataflow apache-beam
add a comment |
My sample data is in json format and looks like:
{
"metadata": {
"action": "insert",
"type": "export",
"version": 1,
"timestamp": "2018-11-23T09:17:59.048-08:00"
},
"data": {
"attr1": 61,
"day": "2018-11-22",
"pin": "2C49956",
"CDP": 0,
"DP": 0,
"VD": 0,
"seo": 0,
"dir": 0,
"other": 0,
"at": 0
}
}
This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.
The code looks as below:
import collections
import json
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
WorkerOptions
from apache_beam.io.gcp import bigquery
# Creating options object
def create_options(argv):
# pipeline options
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'something'
google_cloud_options.job_name = datetime.now().strftime('somename')
google_cloud_options.staging_location = 'some_loc'
google_cloud_options.temp_location = 'another_loc'
options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
return options
class PrepareData(beam.DoFn):
"""
ParDo function to create a dictionary of data for downstream consumption
"""
def process(self, element):
data = json.loads(element)
modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
return [modified_data]
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
# for dict in element["data"]["data"]:
# dict["timestamp"] = element["timestamp"]
return element["data"]["data"]
def run_pipe(options, argv):
"""
Creating pipelines
"""
p = beam.Pipeline(options=options)
main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())
""" Separating pipes for various actions """
insert_pipe= main_pipe | beam.ParDo(FilterInserts())
"""
Inserts--> sinking to BQ
"""
insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
project='some-data-warehouse',
dataset='sample_data',
table='sample',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED')
p.run()
def main():
"""
Main function to drive the run
:return: errors if any
"""
parser = argparse.ArgumentParser()
args = parser.parse_args()
try:
# create options
opt = create_options(argv=args)
# run pipeline
run_pipe(opt, argv=args)
except Exception as e:
logging.error('Pipeline failed with error : %s', e)
raise Exception('Pipeline failed with error : %s', e)
if __name__ == "__main__":
main()
I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
The error message is:
Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))
Can anyone help me figure out what's going wrong and how i can fix this?
python json google-bigquery google-cloud-dataflow apache-beam
add a comment |
My sample data is in json format and looks like:
{
"metadata": {
"action": "insert",
"type": "export",
"version": 1,
"timestamp": "2018-11-23T09:17:59.048-08:00"
},
"data": {
"attr1": 61,
"day": "2018-11-22",
"pin": "2C49956",
"CDP": 0,
"DP": 0,
"VD": 0,
"seo": 0,
"dir": 0,
"other": 0,
"at": 0
}
}
This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.
The code looks as below:
import collections
import json
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
WorkerOptions
from apache_beam.io.gcp import bigquery
# Creating options object
def create_options(argv):
# pipeline options
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'something'
google_cloud_options.job_name = datetime.now().strftime('somename')
google_cloud_options.staging_location = 'some_loc'
google_cloud_options.temp_location = 'another_loc'
options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
return options
class PrepareData(beam.DoFn):
"""
ParDo function to create a dictionary of data for downstream consumption
"""
def process(self, element):
data = json.loads(element)
modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
return [modified_data]
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
# for dict in element["data"]["data"]:
# dict["timestamp"] = element["timestamp"]
return element["data"]["data"]
def run_pipe(options, argv):
"""
Creating pipelines
"""
p = beam.Pipeline(options=options)
main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())
""" Separating pipes for various actions """
insert_pipe= main_pipe | beam.ParDo(FilterInserts())
"""
Inserts--> sinking to BQ
"""
insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
project='some-data-warehouse',
dataset='sample_data',
table='sample',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED')
p.run()
def main():
"""
Main function to drive the run
:return: errors if any
"""
parser = argparse.ArgumentParser()
args = parser.parse_args()
try:
# create options
opt = create_options(argv=args)
# run pipeline
run_pipe(opt, argv=args)
except Exception as e:
logging.error('Pipeline failed with error : %s', e)
raise Exception('Pipeline failed with error : %s', e)
if __name__ == "__main__":
main()
I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
The error message is:
Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))
Can anyone help me figure out what's going wrong and how i can fix this?
python json google-bigquery google-cloud-dataflow apache-beam
My sample data is in json format and looks like:
{
"metadata": {
"action": "insert",
"type": "export",
"version": 1,
"timestamp": "2018-11-23T09:17:59.048-08:00"
},
"data": {
"attr1": 61,
"day": "2018-11-22",
"pin": "2C49956",
"CDP": 0,
"DP": 0,
"VD": 0,
"seo": 0,
"dir": 0,
"other": 0,
"at": 0
}
}
This is in a flat file and the objective is to run a dataflow pipeline in batch mode to insert the data into bigquery table. In one of the transformations where I want to take timestamp from the metadata and add it as a key value pair in the data section, I am getting the error from dataflow saying 'unicode object has no attribute 'items'.
The code looks as below:
import collections
import json
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
WorkerOptions
from apache_beam.io.gcp import bigquery
# Creating options object
def create_options(argv):
# pipeline options
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'something'
google_cloud_options.job_name = datetime.now().strftime('somename')
google_cloud_options.staging_location = 'some_loc'
google_cloud_options.temp_location = 'another_loc'
options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
return options
class PrepareData(beam.DoFn):
"""
ParDo function to create a dictionary of data for downstream consumption
"""
def process(self, element):
data = json.loads(element)
modified_data = {"action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data}
return [modified_data]
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
# for dict in element["data"]["data"]:
# dict["timestamp"] = element["timestamp"]
return element["data"]["data"]
def run_pipe(options, argv):
"""
Creating pipelines
"""
p = beam.Pipeline(options=options)
main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())
""" Separating pipes for various actions """
insert_pipe= main_pipe | beam.ParDo(FilterInserts())
"""
Inserts--> sinking to BQ
"""
insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
project='some-data-warehouse',
dataset='sample_data',
table='sample',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED')
p.run()
def main():
"""
Main function to drive the run
:return: errors if any
"""
parser = argparse.ArgumentParser()
args = parser.parse_args()
try:
# create options
opt = create_options(argv=args)
# run pipeline
run_pipe(opt, argv=args)
except Exception as e:
logging.error('Pipeline failed with error : %s', e)
raise Exception('Pipeline failed with error : %s', e)
if __name__ == "__main__":
main()
I am running this on direct runner to test on local but i get the same error even if i change the runner to dataflow-runner.
The error message is:
Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))
Can anyone help me figure out what's going wrong and how i can fix this?
python json google-bigquery google-cloud-dataflow apache-beam
python json google-bigquery google-cloud-dataflow apache-beam
asked Nov 25 '18 at 1:08
Utsav ChatterjeeUtsav Chatterjee
7410
7410
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
By using the following table's schema
(You can modify it according to your necessities):
schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'
Try the following on your FilterInserts
class:
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
return [{
'VD': element['data']['data']['VD'],
'pin': element['data']['data']['pin'],
'timestamp': element['data']['data']['timestamp'],
'other': element['data']['data']['other'],
'CDP': element['data']['data']['CDP'],
'dir': element['data']['data']['dir'],
'attr1' : element['data']['data']['attr1'],
'seo' : element['data']['data']['seo'],
'day' : element['data']['data']['day'],
'DP' : element['data']['data']['DP'],
'at' : element['data']['data']['at'],
}]
The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.
Hope it helps.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53463824%2ferror-while-using-writetobigquery-in-python-for-dataflow-pipeline-unicode-objec%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
By using the following table's schema
(You can modify it according to your necessities):
schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'
Try the following on your FilterInserts
class:
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
return [{
'VD': element['data']['data']['VD'],
'pin': element['data']['data']['pin'],
'timestamp': element['data']['data']['timestamp'],
'other': element['data']['data']['other'],
'CDP': element['data']['data']['CDP'],
'dir': element['data']['data']['dir'],
'attr1' : element['data']['data']['attr1'],
'seo' : element['data']['data']['seo'],
'day' : element['data']['data']['day'],
'DP' : element['data']['data']['DP'],
'at' : element['data']['data']['at'],
}]
The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.
Hope it helps.
add a comment |
By using the following table's schema
(You can modify it according to your necessities):
schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'
Try the following on your FilterInserts
class:
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
return [{
'VD': element['data']['data']['VD'],
'pin': element['data']['data']['pin'],
'timestamp': element['data']['data']['timestamp'],
'other': element['data']['data']['other'],
'CDP': element['data']['data']['CDP'],
'dir': element['data']['data']['dir'],
'attr1' : element['data']['data']['attr1'],
'seo' : element['data']['data']['seo'],
'day' : element['data']['data']['day'],
'DP' : element['data']['data']['DP'],
'at' : element['data']['data']['at'],
}]
The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.
Hope it helps.
add a comment |
By using the following table's schema
(You can modify it according to your necessities):
schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'
Try the following on your FilterInserts
class:
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
return [{
'VD': element['data']['data']['VD'],
'pin': element['data']['data']['pin'],
'timestamp': element['data']['data']['timestamp'],
'other': element['data']['data']['other'],
'CDP': element['data']['data']['CDP'],
'dir': element['data']['data']['dir'],
'attr1' : element['data']['data']['attr1'],
'seo' : element['data']['data']['seo'],
'day' : element['data']['data']['day'],
'DP' : element['data']['data']['DP'],
'at' : element['data']['data']['at'],
}]
The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.
Hope it helps.
By using the following table's schema
(You can modify it according to your necessities):
schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'
Try the following on your FilterInserts
class:
class FilterInserts(beam.DoFn):
"""
Filter data for inserts
"""
def process(self, element):
if element["action"] == "insert":
element['data']['data']['timestamp'] = element['timestamp']
return [{
'VD': element['data']['data']['VD'],
'pin': element['data']['data']['pin'],
'timestamp': element['data']['data']['timestamp'],
'other': element['data']['data']['other'],
'CDP': element['data']['data']['CDP'],
'dir': element['data']['data']['dir'],
'attr1' : element['data']['data']['attr1'],
'seo' : element['data']['data']['seo'],
'day' : element['data']['data']['day'],
'DP' : element['data']['data']['DP'],
'at' : element['data']['data']['at'],
}]
The issue happens since you need to send a Key Value array to BigQuery, and you were sending a JSON dictionary with unicode strings.
Hope it helps.
answered Nov 26 '18 at 20:33
F10F10
1,4332514
1,4332514
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53463824%2ferror-while-using-writetobigquery-in-python-for-dataflow-pipeline-unicode-objec%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown