googleapis / python-bigquery Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
The Client.load_table_from_file()
method depends on the MultipartUpload
and ResumableUpload
classes from google-resumable-media, which do not support specifying a timeout for the operation, thus these Client
methods were excluded from googleapis/google-cloud-python#9987.
When the resumable media dependency adds support for timeouts, a timeout
argument should be added to these Client
methods.
I am asking for a feature request.
During loading of a bigquery table from CSV data, I wish to add columns based on either a default answer or filename (from the csv)
consider this example:
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
]
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print("Job finished.")
destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
if we could add to job_config.schema a parameter (default answer), that could add to the template, and allow us to customize entry, that would be great.
for example:
job_config.schema = [
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
bigquery.SchemaField("filename", "STRING", var_filename),
bigquery.SchemaField("Process_DT", "DATETIME", '2019-01-01 00:00:00')
]
by adding the last two fields, they are NOT required by CSV.
Optional addon: any default parameter can override a CSV import field.
Thanks.
BigQuery client v1.24.0, all Python versions.
Fire up a Jupyter notebook or an IPython session and load the bigquery
extension:
%load_ext google.cloud.bigquery
Use the bigquery
cell magic without specifying the query:
%%bigquery --project my-project --destination_table my_dataset.my_table
Actual result:
An error message is displayed, but is kind of misleading:
ERROR:
table_id must be a fully-qualified ID in standard SQL format, e.g., "project.dataset.table_id", got
It might make the user wonder why the table ID is incorrect even if --destination
is correctly specified.
Expected result:
The error message should explain that the query is missing in the cell body.
The current implementation assumes that any query without whitespace represents a table ID, but it fails to check if the query is non-empty in the first place. The latter check should be added.
Blocked on:
Using the google-cloud-bigquery client with version 1.23.1
Python 3.7 (on linux and macos)
def table_to_df_iterator(project_id, dataset_id, table_id) -> iter:
table_full_id = project_id + "." + dataset_id + "." + table_id
client = get_client()
index = 0
while True:
offset = BATCH_SIZE_ROWS * index
df = client.list_rows(table_full_id, max_results=BATCH_SIZE_ROWS,
start_index=offset).to_dataframe()
if df.empty:
break
logging.info(f"Offset is at {offset} got a dataframe of size {len(DataFrame.index)}")
yield df
index += 1
DEBUG:google.cloud.bigquery.table:Started reading table 'samsung-global-dashboard.1_Raw.Facebook_SEUK_VIDEO_20190101' with tabledata.list.
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/samsung-global-dashboard/datasets/1_Raw/tables/Facebook_SEUK_VIDEO_20190101/data?maxResults=100000&startIndex=100000 HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/samsung-global-dashboard/datasets/1_Raw/tables/Facebook_SEUK_VIDEO_20190101/data?pageToken=BEP6ZNORN4AQAAASAUIIBAEAAUNAQCEG6ADBBIENAYQP777777777777P4VAA%3D%3D%3D&maxResults=87354&startIndex=100000 HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/samsung-global-dashboard/datasets/1_Raw/tables/Facebook_SEUK_VIDEO_20190101/data?pageToken=BEP6ZNORN4AQAAASAUIIBAEAAUNAQCEG6ADBBOVKAUQP777777777777P4VAA%3D%3D%3D&maxResults=74708&startIndex=100000 HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/samsung-global-dashboard/datasets/1_Raw/tables/Facebook_SEUK_VIDEO_20190101/data?pageToken=BEP6ZNORN4AQAAASAUIIBAEAAUNAQCEG6ADBBVGHAQQP777777777777P4VAA%3D%3D%3D&maxResults=62062&startIndex=100000 HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/samsung-global-dashboard/datasets/1_Raw/tables/Facebook_SEUK_VIDEO_20190101/data?pageToken=BEP6ZNORN4AQAAASAUIIBAEAAUNAQCEG6ADBB3XEAMQP777777777777P4VAA%3D%3D%3D&maxResults=49416&startIndex=100000 HTTP/1.1" 200 None
Make the second call use an updated startIndex instead of 'nextPageToken'
Thanks!
df = bq_client.list_rows(table).to_dataframe()
df.types
contractId int64
contractTypeId int64
affiliateId int64
invoicePeriodId int64
startDate **object**
endDate **object**
contractName object
details object
updated datetime64[ns, UTC]
created datetime64[ns, UTC]
dtype: object
Timestamps seem to work fine, but date is just being treated as a string.
Table schema is defined as:
'contract' : [
bigquery.SchemaField("contractId", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("contractTypeId", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("affiliateId", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("invoicePeriodId", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("startDate", "DATE", mode="REQUIRED"),
bigquery.SchemaField("endDate", "DATE", mode="NULLABLE"),
bigquery.SchemaField("contractName", "STRING", mode="NULLABLE"),
bigquery.SchemaField("details", "STRING", mode="NULLABLE"),
bigquery.SchemaField("created", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("updated", "TIMESTAMP", mode="REQUIRED"),
]
This should have anything to do with pagination, because the startDate field is required; also the table only has 20 records.
After updating google-cloud-bigquery from version 1.19.0 to 1.24.0,
requests.exceptions.ReadTimeout: HTTPSConnectionPool(host='bigquery.googleapis.com', port=443): Read timed out. (read timeout=11.0)
started to pop up randomly.
This is likely related to #34
Ubuntu 19.10 x64
3.7.5
pip 20.0.2
google-cloud-bigquery
version: 1.24.0
The error is not deterministic, however we observed it on production environments (compute instances in GCP) and local developement environments.
For about 100 queries, at least 1-2 fails with this error (which makes it reproducible). This makes something like 1-2% of ALL requests to fail!
If I understand correctly, BQ API endpoint responsible for result() method will block for at most 10 seconds. There is also 1 second margin to neutralize network lags etc. No retry mechanism covering the timeout is present, so in case of delay of more than 1 second, the whole request will fail.
In my opinion, having 1.0s non-configurable timeout is not safe. Also not-mutating endpoints (like "is job finished") should automatically retry in case of timeout. This is not implemented at the moment.
We had to rollback to 1.19.0 to make everything stable again.
Nothing really helpful could be placed here.
The easiest way to reproduce this error is to run query that takes MORE than 10s in 100x loop.
The stack trace is always the same:
Traceback (most recent call last):
File "<proj>/venv/lib/python3.7/site-packages/urllib3/connectionpool.py", line 421, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "<proj>/venv/lib/python3.7/site-packages/urllib3/connectionpool.py", line 416, in _make_request
httplib_response = conn.getresponse()
File "/usr/lib/python3.7/http/client.py", line 1344, in getresponse
response.begin()
File "/usr/lib/python3.7/http/client.py", line 306, in begin
version, status, reason = self._read_status()
File "/usr/lib/python3.7/http/client.py", line 267, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
File "/usr/lib/python3.7/ssl.py", line 1071, in recv_into
return self.read(nbytes, buffer)
File "/usr/lib/python3.7/ssl.py", line 929, in read
return self._sslobj.read(len, buffer)
socket.timeout: The read operation timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<proj>/venv/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "<proj>/venv/lib/python3.7/site-packages/urllib3/connectionpool.py", line 720, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "<proj>/venv/lib/python3.7/site-packages/urllib3/util/retry.py", line 400, in increment
raise six.reraise(type(error), error, _stacktrace)
File "<proj>/venv/lib/python3.7/site-packages/urllib3/packages/six.py", line 735, in reraise
raise value
File "<proj>/venv/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "<proj>/venv/lib/python3.7/site-packages/urllib3/connectionpool.py", line 423, in _make_request
self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
File "<proj>/venv/lib/python3.7/site-packages/urllib3/connectionpool.py", line 331, in _raise_timeout
self, url, "Read timed out. (read timeout=%s)" % timeout_value
urllib3.exceptions.ReadTimeoutError: HTTPSConnectionPool(host='bigquery.googleapis.com', port=443): Read timed out. (read timeout=11.0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 3196, in result
super(QueryJob, self).result(retry=retry, timeout=timeout)
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 818, in result
return super(_AsyncJob, self).result(timeout=timeout)
File "<proj>/venv/lib/python3.7/site-packages/google/api_core/future/polling.py", line 122, in result
self._blocking_poll(timeout=timeout)
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 3098, in _blocking_poll
super(QueryJob, self)._blocking_poll(timeout=timeout)
File "<proj>/venv/lib/python3.7/site-packages/google/api_core/future/polling.py", line 101, in _blocking_poll
retry_(self._done_or_raise)()
File "<proj>/venv/lib/python3.7/site-packages/google/api_core/retry.py", line 289, in retry_wrapped_func
return retry_wrapped_func
File "<proj>/venv/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
return target()
File "<proj>/venv/lib/python3.7/site-packages/google/api_core/future/polling.py", line 80, in _done_or_raise
if not self.done():
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 3085, in done
timeout=timeout,
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 1287, in _get_query_results
retry, method="GET", path=path, query_params=extra_params, timeout=timeout
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 556, in _call_api
return call()
File "<proj>/venv/lib/python3.7/site-packages/google/api_core/retry.py", line 289, in retry_wrapped_func
return retry_wrapped_func
File "<proj>/venv/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
return target()
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/_http.py", line 419, in api_request
timeout=timeout,
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/_http.py", line 277, in _make_request
method, url, headers, data, target_object, timeout=timeout
File "<proj>/venv/lib/python3.7/site-packages/google/cloud/_http.py", line 315, in _do_request
url=url, method=method, headers=headers, data=data, timeout=timeout
File "<proj>/venv/lib/python3.7/site-packages/google/auth/transport/requests.py", line 317, in request
**kwargs
File "<proj>/venv/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
resp = self.send(prep, **send_kwargs)
File "<proj>/venv/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
r = adapter.send(request, **kwargs)
File "<proj>/venv/lib/python3.7/site-packages/requests/adapters.py", line 529, in send
raise ReadTimeout(e, request=request)
requests.exceptions.ReadTimeout: HTTPSConnectionPool(host='bigquery.googleapis.com', port=443): Read timed out. (read timeout=11.0)
Following a comment in another thread, a user expressed the need for dynamically specifying the source table ID.
Describe the solution you'd like
The user wants to templatize Jupyter notebooks, and they need a dynamic way of specifying the source table ID, similarly to the existing --destination_table
parameter. The existing feature that allows the table ID to be specified in the cell body is not sufficient.
Describe alternatives you've considered
The first attempt was to use query parameters, but these can only be used for query data, and not for table, column, etc. identifiers (documentation).
There is a plan to add support for "raw" queries, but constructing these dynamically is a potential security risk, because no automatic parameter escaping takes place. Additionally, it will take some time to implement this feature on the backend, and the release date is not even known yet.
Good day,
I am using the Python client libraries for BigQuery. I noticed in the documentation that the google.cloud.bigquery.job.LoadJob methods do not indicate return types. They only seem to give a brief explanation of the method.
In particular, I am looking for what the return type of the result() method is. I noticed that in the source code, queryJob.result() returns an EmptyRowIterator, but I am not able to find out whether this is the case for loadJob's as there doesn't seem to be a result() method defined for the LoadJob class. Is the result used in this context that from google.api_core.future.polling.PollingFuture?
My goal is simply to determine whether the load job was successful or not.
Would it be possible to add some information on the return types of the methods, as is the case with other classes in the library?
As explained in a comment, preventing the synthtool for overriding the customized noxfile.py
can be achieved in a more straightforward way than currently used.
This is a tracking FR for implementing the Integer Range Partitioning functionality within the
BigQuery API.
Integer range partitioning supports partitioning a table based on the values in a specified integer column, and divides the table using start, stop, and interval values. A table can only support a single partitioning specification, so users can choose either integer partitioning or time-based partitioning, but not both. Clustering is independent of partitioning and works with both.
More implementation details about this API are shared directly with library
maintainers. Please contact shollyman for questions.
I hope to attach Data Catalog policy tags to fields for column-level security.
https://cloud.google.com/bigquery/docs/column-level-security-intro
It will just append policyTags
field to FieldSchema
like
googleapis/google-cloud-go@4c25d13
Hi all, i tried bq client in python with the default example. Since i moved from 1.23.1 to 1.24.0 last week i get the following issue.
Its related to pyarrow but i was not upgrading pyarrow (worked with it before)
Python 3.7.6
bigquery.version '1.24.0'
pyarrow.version '0.11.1'
Linux jupyter-generic 4.15.0-1057-aws googleapis/google-cloud-python#59-Ubuntu SMP Wed Dec 4 10:02:00 UTC 2019 x86_64
x86_64 x86_64 GNU/Linux
Name: google-cloud-bigquery
Version: 1.24.0
Summary: Google BigQuery API client library
Location: /opt/conda/lib/python3.7/site-packages
Requires: google-cloud-core, google-auth, six, google-resumable-media, protobuf, google-api-core
Required-by: pandas-gbq
just running a default example form the webhttps://cloud.google.com/bigquery/docs/bigquery-storage-python-pandas
import google.auth
from google.cloud import bigquery
client = bigquery.Client.from_service_account_json('cred.json')
# Download query results.
query_string = """
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""
dataframe = (
client.query(query_string)
.result()
.to_dataframe()
)
print(dataframe.head())
--------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-11-61d06599dbdd> in <module>
12
13 dataframe = (
---> 14 client.query(query_string)
15 .result()
16 .to_dataframe()
/opt/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py in to_dataframe(self, bqstorage_client, dtypes, progress_bar_type, create_bqstorage_client)
1727 progress_bar_type=progress_bar_type,
1728 bqstorage_client=bqstorage_client,
-> 1729 create_bqstorage_client=create_bqstorage_client,
1730 )
1731 df = record_batch.to_pandas()
/opt/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py in to_arrow(self, progress_bar_type, bqstorage_client, create_bqstorage_client)
1541 record_batches = []
1542 for record_batch in self._to_arrow_iterable(
-> 1543 bqstorage_client=bqstorage_client
1544 ):
1545 record_batches.append(record_batch)
/opt/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py in _to_page_iterable(self, bqstorage_download, tabledata_list_download, bqstorage_client)
1433 )
1434 )
-> 1435 for item in tabledata_list_download():
1436 yield item
1437
/opt/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py in download_arrow_tabledata_list(pages, bq_schema)
523
524 for page in pages:
--> 525 yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
526
527
/opt/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py in _tabledata_list_page_to_arrow(page, column_names, arrow_types)
499
500 if isinstance(column_names, pyarrow.Schema):
--> 501 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names)
502 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
503
/opt/conda/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.RecordBatch.from_arrays()
TypeError: from_arrays() takes at least 2 positional arguments (1 given)
We're getting some frequent test failures in the pandas integration tests due to timed out requests, possibly due to googleapis/google-cloud-python#10219 which added non-None
default timeout.
It seems 60 seconds is not enough for these calls (but strange that so many API requests are timing out).
Context: googleapis/python-bigquery-pandas#309
create_dataset
. if new_retry.is_exhausted():
> raise MaxRetryError(_pool, url, error or ResponseError(cause))
E urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='bigquery.googleapis.com', port=443): Max retries exceeded with url: /bigquery/v2/projects/pandas-travis/datasets/pydata_pandas_bq_testing_dddqwbgrjz/tables/hrsctwrmva (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7ff87946e898>: Failed to establish a new connection: [Errno 110] Connection timed out',))
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/urllib3/util/retry.py:436: MaxRetryError
During handling of the above exception, another exception occurred:
self = <pandas.tests.io.test_gbq.TestToGBQIntegrationWithServiceAccountKeyPath object at 0x7ff879725160>
gbq_dataset = 'pydata_pandas_bq_testing_dddqwbgrjz.hrsctwrmva'
def test_roundtrip(self, gbq_dataset):
destination_table = gbq_dataset
test_size = 20001
df = make_mixed_dataframe_v2(test_size)
df.to_gbq(
destination_table,
_get_project_id(),
chunksize=None,
> credentials=_get_credentials(),
)
pandas/tests/io/test_gbq.py:185:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pandas/core/frame.py:1551: in to_gbq
credentials=credentials,
pandas/io/gbq.py:219: in to_gbq
private_key=private_key,
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/pandas_gbq/gbq.py:1202: in to_gbq
if table.exists(table_id):
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/pandas_gbq/gbq.py:1312: in exists
self.client.get_table(table_ref)
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/cloud/bigquery/client.py:679: in get_table
retry, method="GET", path=table_ref.path, timeout=timeout
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/cloud/bigquery/client.py:556: in _call_api
return call()
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/api_core/retry.py:286: in retry_wrapped_func
on_error=on_error,
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/api_core/retry.py:184: in retry_target
return target()
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/cloud/_http.py:419: in api_request
timeout=timeout,
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/cloud/_http.py:277: in _make_request
method, url, headers, data, target_object, timeout=timeout
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/cloud/_http.py:315: in _do_request
url=url, method=method, headers=headers, data=data, timeout=timeout
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/google/auth/transport/requests.py:317: in request
**kwargs
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/requests/sessions.py:533: in request
resp = self.send(prep, **send_kwargs)
../../../miniconda3/envs/pandas-dev/lib/python3.6/site-packages/requests/sessions.py:646: in send
r = adapter.send(request, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
Hi, I am running query against a larger table (app. 100M records).
After some research and tests I decided to use QueryJob for that and use result with page_size. I know that I can safely process approximately 4-5M records in a batch.
So I liked the idea of using the paging and used page_size = 2000000, which is far below what can be handled. But then I realised that I am getting much less data. So I ran some analysis and it looks like I am getting app. 50K records per page, no matter what I specify in the page_size.
Ubuntu 18.04 - 2CPUs / 4GB RAM
Python version - 3.6.9
google-cloud 0.34.0
{
"type": "service_account",
"project_id": "<your_project_id>",
"private_key_id": "<your_private_key_id>",
"private_key": "<your_private_key>",
"client_email": "<your_client_email>",
"client_id": "<your_client_id>",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/projectname-service-account%40zeta-handler-9999.iam.gserviceaccount.com"
}
import os
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './service_account.json'
client = bigquery_storage_v1beta1.BigQueryStorageClient()
bqclient = bigquery.Client(project="<your_project_id>")
query_string = "SELECT * FROM <some_bq_table_with_many_records>"
iterator = bqclient.query(query_string).result(page_size=1000000)
for page in iterator.pages:
print(len(list(page)))
There is no Stack trace - it runs OK just as not expected!
It would be nice to know why it is so and how to fix / change it. Thanks!
Is your feature request related to a problem? Please describe.
If you have a pandas Series containing dictionaries, ideally this could be uploaded to BigQuery as a STRUCT / RECORD column. Currently this fails with a "file does not exist error" because the arrow write_table fails with ""ArrowInvalid: Nested column branch had multiple children".
Describe the solution you'd like
Upload of a RECORD column succeeds. This will require a fix to https://jira.apache.org/jira/browse/ARROW-2587.
Describe alternatives you've considered
Change intermediate file format to JSON or some other type. This isn't ideal, since most other types are row-oriented, but pandas DataFrames are column-oriented.
In the pandas-related BigQuery code, there is a lot of branching on whether pyarrow
is available or not, and significant portions of business logic dealing with both cases.
The pyarrow
side is often more efficient, more concise, and less prone to weird edge cases, thus it is preferred that it eventually becomes the only code path.
The goal of this issue is to emit deprecation warnings whenever a code path is hit that deals with pandas
, but without the pyarrow
dependency available.
I am fetching data from bigquery by using
from google.cloud import bigquery
...
def bigquery_to_df(query):
'''A small tool to convert the result of Bigquery to DataFrame'''
query_job = bigquery_client.query(query)
df = query_job.result().to_dataframe()
return df
...
But got AttributeError which I didn't get before
10 query_job = bigquery_client.query(query)
---> 11 df = query_job.result().to_dataframe()
12 return df
13
AttributeError: 'HTTPIterator' object has no attribute 'to_dataframe'
Mac Book Pro
I used to use google-cloud-bigquery 0.28.0 and it worked fine for a long time until now. And even after I upgraded to 1.24.0 now the error still occurs.
Does anyone encounter the same issue?
QueryJobConfig does not seem to take into account the schema of the destination table.
It looks like a bug to me.
from google.cloud import bigquery
schema=[bigquery.SchemaField('id', 'INTEGER', 'REQUIRED', None, ()),
bigquery.SchemaField('tag_name', 'STRING', 'REQUIRED', None, ()),
bigquery.SchemaField('count', 'INTEGER', 'REQUIRED', None, ()),
bigquery.SchemaField('excerpt_post_id', 'INTEGER', 'REQUIRED', None, ()),
bigquery.SchemaField('wiki_post_id', 'INTEGER', 'REQUIRED', None, ())]
table_id='myproject.mydataset.mytable'
job_config = bigquery.QueryJobConfig()
job_config.destination = table_id
job_config.schema=schema
sql = """
SELECT id, REVERSE(tag_name) as tag_name, count, excerpt_post_id, wiki_post_id
FROM `bigquery-public-data.stackoverflow.tags`
"""
query_job = client.query(sql, job_config=job_config)
query_job.result()
<google.cloud.bigquery.table.RowIterator object at 0x7f52a8dfca58>
>>> client.get_table('myproject.mydataset.mytable').schema
[SchemaField('id', 'INTEGER', 'NULLABLE', None, ()),
SchemaField('tag_name', 'STRING', 'NULLABLE', None, ()),
SchemaField('count', 'INTEGER', 'NULLABLE', None, ()),
SchemaField('excerpt_post_id', 'INTEGER', 'NULLABLE', None, ()),
SchemaField('wiki_post_id', 'INTEGER', 'NULLABLE', None, ())]
Unit tests check fails on Python 2.7 and Python 3.5, because not all dependencies can be installed.
pip --version
: 20.0.2google-cloud-bigquery
version: 1.24.0 nox -f noxfile.py -s unit-2.7
# example
Building wheels for collected packages: llvmlite
...
RuntimeError: Building llvmlite requires LLVM 7.0.x, 7.1.x or 8.0.x, got '11.0.0'. Be sure to set LLVM_CONFIG to the right executable path.
--export_to_csv
Hello,
In the case of our business, we would need an argument capable of handling csv export on GCS directly with magic queries.
We thought this new argument could enrich the library.
`--export_to_csv`
Args:
------
-"filename" (Str):
- GCS Destination URI (e.g. "gs://bucketname/filenmane.csv")
"temporary_dataset":
Location where the temporary table will be created
in Bigquery before being exported as csv to GCS and then deleted from Bgiquery.
--
IMPORTANT : needed if `--destination_table` is not used in the same magic query.
--destination_table
to_gcs = {"filename" : "gs://bucket_name/emails_280319.csv",
"temporary_dataset" : "dataset_id"
}
%%biquery --export_to_csv $to_gcs
SELECT emails, names
FROM `project_name.dataset_name.table_name`
result :
1 - create a tomparary table with the query result
2 - export the table to gcs
3 - delete the temporary table
--destination_table
output_table = "consumers.emails_280319"
to_gcs = {"filename" : "gs://bucket_name/emails_280319.csv"}
In this case we don't need to declare "temporary_dataset"
because the user wants to save the result in Bigquery before exporting it to GCS.
%%biquery --destination_table $output_table --export_to_csv $to_gcs
SELECT emails, names
FROM `project_name.dataset_name.table_name`
result :
1 - Save the query in projectName.consumers.emails_280319
2 - export the table to gcs
_handle_export_to_csv
:def _handle_export_to_csv(client, destination_uri, destination_table_ref):
extract_job = client.extract_table(
destination_table_ref,
destination_uri,
location="EU"
)
return extract_job
_cell_magic()
job_config = bigquery.job.QueryJobConfig()
job_config.query_parameters = params
job_config.use_legacy_sql = args.use_legacy_sql
job_config.dry_run = args.dry_run
######################
# --destination_table
######################
extract_job=None
destination_table_ref = None
if args.destination_table:
split = args.destination_table.split(".")
if len(split) != 2:
raise ValueError(
"--destination_table should be in a <dataset_id>.<table_id> format."
)
dataset_id, table_id = split
dataset_ref = bigquery.dataset.DatasetReference(client.project, dataset_id)
print('_Destionation Table destdataset_id= ', dataset_ref)
destination_table_ref = dataset_ref.table(table_id)
print('destination_table_ref', destination_table_ref)
#job_config.destination = destination_table_ref <- REPLACED ABOVE
job_config.allow_large_results = True
job_config.create_disposition = "CREATE_IF_NEEDED"
job_config.write_disposition = "WRITE_TRUNCATE"
_create_dataset_if_necessary(client, dataset_id)
######################
# --export_to_csv
######################
if args.export_to_csv:
#INIT TEMPORARY TABLE
export_csv_params = ast.literal_eval("".join(args.export_to_csv))
len_export_csv_params = len(export_csv_params)
if destination_table_ref==None:
# If user does not use --destination_table but wants to --export_to_csv
if ( not 0 < len_export_csv_params < 3) or ( not export_csv_params.get('filename') or not export_csv_params.get('temporary_dataset')) :
raise ValueError(
"""--export_to_csv should be tin a : {'filename':'gs://bucket_name/filename', 'temporary_dataset' : 'dataset_id' } format."""
)
temporary_table = str(uuid.uuid4()).replace('-','_')
destination_table_ref = project + '.'+ export_csv_params['temporary_dataset'] +'.' + temporary_table
elif ( len_export_csv_params !=1) or ( not export_csv_params.get('filename')):
# If user use --destination_table and --export_to_bucket ignore dataset
raise ValueError(
"""--export_to_csv should be in : {'filename':'gs://bucket_name/filename'} format. No need to pass 'temporary_dataset' argument"""
)
destination_uri = export_csv_params['filename']
try:
#QUERY
query_job = _run_query(client, query, job_config=job_config)
query_job.result()
#EXPORT TO GCS
if args.export_to_csv:
export_params =ast.literal_eval("".join(args.export_to_csv))
extract_job = _handle_export_to_csv(client, destination_uri, destination_table_ref)
print('Export to GCS)
except Exception as ex:
_handle_error(ex, args.destination_var) ## <-- revoir l'erreur ?
return
result = query_job.to_dataframe()
# DELETE TEMPORARY TABLE
if not args.destination_table and args.export_to_csv:
client.delete_table(destination_table_ref)
print('Delete temporary table')
# REST OF COD
if args.destination_var:
IPython.get_ipython().push({args.destination_var: result})
else:
return result
Would you know if a similar feature is planned for the library?
Thank you very much.
The fix for the likely external issue #61 is not expected for the next day or two, but the failing test is currently blocking all other PRs. The decision was made to temporarily adjust it, and revert the change once the fix is ready.
tl;dr: It looks like generated docstrings have invalid escape sequences. Happy to move this to the repo for the codegen if you point me at it.
Note that the sample below has to pass -X pycache_prefix=/tmp
to avoid loading the .pyc generated at package installation.
$ docker run -it --rm --entrypoint bash python:3.8-buster
root@dbd87231f72f:/# pip install -q google-cloud-bigquery
root@dbd87231f72f:/# python -Wall -X pycache_prefix=/tmp
Python 3.8.0 (default, Oct 17 2019, 05:36:36)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from google.cloud.bigquery_v2.gapic import enums
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/proto/model_pb2.py:3447: DeprecationWarning: invalid escape sequence \_
__doc__="""A single entry in the confusion matrix.
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/proto/model_pb2.py:3653: DeprecationWarning: invalid escape sequence \_
__doc__="""Protocol buffer.
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/proto/model_pb2.py:3777: DeprecationWarning: invalid escape sequence \_
__doc__="""Information about a single training query run for the model.
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/proto/model_pb2.py:3807: DeprecationWarning: invalid escape sequence \_
__doc__="""Protocol buffer.
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/proto/model_pb2.py:3986: DeprecationWarning: invalid escape sequence \_
__doc__="""Protocol buffer.
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/proto/model_reference_pb2.py:122: DeprecationWarning: invalid escape sequence \_
__doc__="""Id path of a model.
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/proto/standard_sql_pb2.py:311: DeprecationWarning: invalid escape sequence \_
__doc__="""The type of a variable, e.g., a function argument. Examples: INT64:
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/gapic/enums.py:24: DeprecationWarning: invalid escape sequence \_
"""
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/gapic/enums.py:120: DeprecationWarning: invalid escape sequence \_
"""
/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_v2/gapic/enums.py:138: DeprecationWarning: invalid escape sequence \_
"""
>>>
For ModelReference
's repr use project_id against model_id
python-bigquery/google/cloud/bigquery/model.py
Lines 432 to 435 in be5c8b1
A system test test_load_table_from_dataframe_w_explicit_schema()
consistently fails on the latest master
branch, both under Python 2.7 and Python 3.8 (example Kokoro run). It is also consistently reproducible locally.
...
google.api_core.exceptions.BadRequest: 400 Error while reading data, error message: Invalid datetime value -62135596800000000 for field 'dt_col' of type 'INT64' (logical type 'TIMESTAMP_MICROS'): generic::out_of_range: Cannot return an invalid datetime value of -62135596800000000 microseconds relative to the Unix epoch. The range of valid datetime values is [0001-01-1 00:00:00, 9999-12-31 23:59:59.999999]
Ticket in the Google issue tracker: https://issuetracker.google.com/issues/151765076
With python bigquery client v1.24.0
As noted in googleapis/google-cloud-python#4452
google.cloud.bigquery.client.Client.update_table does a patch (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/patch)
But nothing does an update (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/update)
This makes it hard to update a view.
As a workaround, I can do this:
table_ref = bq.Table("x.y.z")
table_ref.view_query = "SELECT 42"
table = bq.Table.from_api_repr(client._call_api(
bq.DEFAULT_RETRY, method="PUT", path=table_ref.path, data=table_ref.to_api_repr()))
Can the client have a function like replace_table
that will call this endpoint?
Is your feature request related to a problem? Please describe.
In projects like superset, that use the SQL Alchemy connector, it can be quite slow to download large query results. The BigQuery Storage API speeds this up for to_dataframe
/ pandas, but not when used via the DB-API / SQL Alchemy. See: googleapis/python-bigquery-sqlalchemy#41
Describe the solution you'd like
When creating a DB-API Connection, provide a way to supply a BQ Storage client, in addition to a BQ client. Use this client to download results for the relevant methods in the Cursor object.
Describe alternatives you've considered
Could have a use_bqstorage_api
option, but this would be inconsistent with the current constructor, which expects a client.
/cc @yiga2
The synthtool should start using bazel instead of Artman.
This is similar to the proposal in googleapis/google-cloud-python#6042, but relates to specific jobs rather than client bindings. Simply managing multiple keys isn't really a viable solution for some use cases, such as superset (apache/superset#9182)
Even for less security-sensitive usecases, I think this would be a valuable affordance for developers to more clearly define code contracts.
I am running a spark script that needs to perform a count(*)
query 30x for every row in a dataframe. The dataframe on average has 25000 rows, which means after completing the script should have made 750000 requests/queries to the BigQuery table.
For some reason with a large amount of executors I ran into the RuntimeError
detailed in the stacktrace below, where it seems the google api core is unable to create a bigquery client. Is this because my script is creating too many clients? Apologies if my code is incorrect or I am using the client wrong, I am new to BigQuery and have not used this api before. What would be the best way to use the BIgQuery storage api in this use case?
def query_cache(query):
bqclient, bqstorageclient = clients()
dataframe = (
bqclient.query(query)
.result()
.to_dataframe(bqstorage_client=bqstorageclient)
)
return dataframe['f0_'][0]
@pandas_udf(schema(), PandasUDFType.GROUPED_MAP)
def calc_counts(df):
query = "select count(*) from dataset.table where ...{some column filters}..."
df['count'] = df.apply(query_cache, args=(query), axis=1)
py4j.protocol.Py4JJavaError: An error occurred while calling o526.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 54, ip-172-31-8-118.us-west-2.compute.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 113, in wrapped
result = f(pd.concat(value_series, axis=1))
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/util.py", line 113, in wrapper
return f(*args, **kwargs)
File "/home/hadoop/metrics_bq.py", line 724, in calc_comps
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/frame.py", line 6928, in apply
return op.get_result()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 186, in get_result
return self.apply_standard()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 292, in apply_standard
self.apply_series_generator()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 321, in apply_series_generator
results[i] = self.f(v)
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 112, in f
return func(x, *args, **kwds)
File "/home/hadoop/metrics_bq.py", line 718, in count_comps_sql
File "/home/hadoop/metrics_bq.py", line 652, in query_cache
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1636, in to_dataframe
bqstorage_client=bqstorage_client, dtypes=dtypes
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1414, in _to_page_iterable
for item in bqstorage_download():
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 632, in _download_table_bqstorage
requested_streams=requested_streams,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1beta1/gapic/big_query_storage_client.py", line 318, in create_read_session
request, retry=retry, timeout=timeout, metadata=metadata
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 277, in retry_wrapped_func
on_error=on_error,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 182, in retry_target
return target()
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
return callable_(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 689, in __call__
wait_for_ready, compression)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 676, in _blocking
),), self._context)
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 500, in grpc._cython.cygrpc.Channel.segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 368, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 362, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 222, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 250, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 56, in grpc._cython.cygrpc._get_metadata
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 31, in grpc._cython.cygrpc._spawn_callback_async
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 22, in grpc._cython.cygrpc._spawn_callback_in_thread
File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 119, in grpc._cython.cygrpc.ForkManagedThread.start
File "/home/hadoop/conda/lib/python3.7/threading.py", line 852, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: ("can't start new thread", 'occurred at index 0')
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 113, in wrapped
result = f(pd.concat(value_series, axis=1))
File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/util.py", line 113, in wrapper
return f(*args, **kwargs)
File "/home/hadoop/metrics_bq.py", line 724, in calc_comps
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/frame.py", line 6928, in apply
return op.get_result()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 186, in get_result
return self.apply_standard()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 292, in apply_standard
self.apply_series_generator()
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 321, in apply_series_generator
results[i] = self.f(v)
File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 112, in f
return func(x, *args, **kwds)
File "/home/hadoop/metrics_bq.py", line 718, in count_comps_sql
File "/home/hadoop/metrics_bq.py", line 652, in query_cache
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1636, in to_dataframe
bqstorage_client=bqstorage_client, dtypes=dtypes
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1414, in _to_page_iterable
for item in bqstorage_download():
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 632, in _download_table_bqstorage
requested_streams=requested_streams,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1beta1/gapic/big_query_storage_client.py", line 318, in create_read_session
request, retry=retry, timeout=timeout, metadata=metadata
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 277, in retry_wrapped_func
on_error=on_error,
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 182, in retry_target
return target()
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
return callable_(*args, **kwargs)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 689, in __call__
wait_for_ready, compression)
File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 676, in _blocking
),), self._context)
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 500, in grpc._cython.cygrpc.Channel.segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 368, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 362, in grpc._cython.cygrpc._segregated_call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 222, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 250, in grpc._cython.cygrpc._call
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 56, in grpc._cython.cygrpc._get_metadata
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 31, in grpc._cython.cygrpc._spawn_callback_async
File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 22, in grpc._cython.cygrpc._spawn_callback_in_thread
File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 119, in grpc._cython.cygrpc.ForkManagedThread.start
File "/home/hadoop/conda/lib/python3.7/threading.py", line 852, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: ("can't start new thread", 'occurred at index 0')
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
I am trying to create a new BigQuery table from about 26k parquet files and have not been able to do so from the UI, CLI and via the python client. Below I am describing how I tried via the python client (so i cloud easily specify the target schema). I am running this on a MacBook pro in Python 3.7.4.
This one is sort of similar to googleapis/google-cloud-python#9207 but then again different because the file_name is not repeated but required.
In a nutshell: I tried creating the table from scratch via UI/CLI by just referencing to my gcs path with a wildcard and then by explicitly creating the schema via the python api. Both throw the same error.
fastparquet schema on *.snappy.parquet
- spark_schema:
- file_name1: BYTE_ARRAY, UTF8, OPTIONAL
- file_name2: BYTE_ARRAY, UTF8, OPTIONAL
- file_name3: BYTE_ARRAY, UTF8, OPTIONAL
- file_name4: BYTE_ARRAY, UTF8, OPTIONAL
- file_name5: BYTE_ARRAY, UTF8, OPTIONAL
- file_name6: BYTE_ARRAY, UTF8, OPTIONAL
- file_name7: INT32, OPTIONAL
- file_name8: BYTE_ARRAY, UTF8, OPTIONAL
- file_name9: BYTE_ARRAY, UTF8, OPTIONAL
- file_name10: INT32, OPTIONAL
- file_name11: BYTE_ARRAY, UTF8, OPTIONAL
- file_name12: BYTE_ARRAY, UTF8, OPTIONAL
- file_name13: INT32, DATE, OPTIONAL
- file_name14: BYTE_ARRAY, UTF8, REQUIRED
!gcloud auth application-default login
from google.cloud import bigquery
client = bigquery.Client()
table_id = "project.dataset.id"
schema = [
bigquery.SchemaField("field1", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field2", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field3", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field4", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field5", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field6", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field7", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("field8", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field9", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field10", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("field11", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field12", "STRING", mode="NULLABLE"),
bigquery.SchemaField("field13", "DATE", mode="NULLABLE"),
bigquery.SchemaField("field14", "STRING", mode="REQUIRED")
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)
created table project.dataset.id
dataset_id = 'dataset'
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job_config.source_format = bigquery.SourceFormat.PARQUET
uri = "gs://gs_location/mde_to_gcp/*"
load_job = client.load_table_from_uri(
uri, dataset_ref.table("id"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print("Job finished.")
destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
BadRequest Traceback (most recent call last)
in
12 print("Starting job {}".format(load_job.job_id))
13
---> 14 load_job.result() # Waits for table load to complete.
15 print("Job finished.")
16
/opt/anaconda3/lib/python3.7/site-packages/google/cloud/bigquery/job.py in result(self, timeout, retry)
776 self._begin(retry=retry)
777 # TODO: modify PollingFuture so it can pass a retry argument to done().
--> 778 return super(_AsyncJob, self).result(timeout=timeout)
779
780 def cancelled(self):
/opt/anaconda3/lib/python3.7/site-packages/google/api_core/future/polling.py in result(self, timeout)
125 # pylint: disable=raising-bad-type
126 # Pylint doesn't recognize that this is valid in this case.
--> 127 raise self._exception
128
129 return self._result
BadRequest: 400 Error while reading data, error message: Provided schema is not compatible with the file 'part-00198-12706474-345b-493c-b3a8-f6894c8bf91f-c000.snappy.parquet'. Field 'filename' is specified as NULLABLE in provided schema which does not match REQUIRED as specified in the file.
I'm working with the Google Vision API and trying to do some analysis in pandas. When I try to get the output into pandas there is an issue and I get the NotImplementedError Below.
def get_data(_project_id, _table_id):
client = bq.Client(project=_project_id)
try:
df = client.query(sql_)
except OperationalError as oe:
print(oe.msg)
print("Query Complete. Converting to Dataframe")
df = df.to_dataframe(progress_bar_type='tqdm') # Converts to dataframe
return df
sql_ = (
'''
SELECT
*
FROM `{}`
'''.format(_table_id)
)
df = get_data(project_id, table_id)
ArrowNotImplementedError Traceback (most recent call last)
<ipython-input-17-15263a59b273> in <module>
----> 1 df = get_data(project_id, table_id)
<ipython-input-16-83704d37548f> in get_data(_project_id, _table_id, verbose)
18 print("Query Complete. Converting to Dataframe")
19
---> 20 df = df.to_dataframe(progress_bar_type='tqdm') # Converts to dataframe
21 return df
/opt/conda/lib/python3.7/site-packages/google/cloud/bigquery/job.py in to_dataframe(self, bqstorage_client, dtypes, progress_bar_type, create_bqstorage_client)
3372 dtypes=dtypes,
3373 progress_bar_type=progress_bar_type,
-> 3374 create_bqstorage_client=create_bqstorage_client,
3375 )
3376
/opt/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py in to_dataframe(self, bqstorage_client, dtypes, progress_bar_type, create_bqstorage_client)
1729 create_bqstorage_client=create_bqstorage_client,
1730 )
-> 1731 df = record_batch.to_pandas()
1732 for column in dtypes:
1733 df[column] = pandas.Series(df[column], dtype=dtypes[column])
/opt/conda/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib._PandasConvertible.to_pandas()
/opt/conda/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.Table._to_pandas()
/opt/conda/lib/python3.7/site-packages/pyarrow/pandas_compat.py in table_to_blockmanager(options, table, categories, ignore_metadata, types_mapper)
764 _check_data_column_metadata_consistency(all_columns)
765 columns = _deserialize_column_index(table, all_columns, column_indexes)
--> 766 blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
767
768 axes = [columns, index]
/opt/conda/lib/python3.7/site-packages/pyarrow/pandas_compat.py in _table_to_blocks(options, block_table, categories, extension_columns)
1099 columns = block_table.column_names
1100 result = pa.lib.table_to_blocks(options, block_table, categories,
-> 1101 list(extension_columns.keys()))
1102 return [_reconstruct_block(item, columns, extension_columns)
1103 for item in result]
/opt/conda/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.table_to_blocks()
/opt/conda/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
ArrowNotImplementedError: Not implemented type for Arrow list to pandas: struct<score: double, description: string>
I have an issue where it is not possible to upload a Pandas DataFrame with a repeated field to BigQuery. It is very much related to an issue I've had earlier: googleapis/google-cloud-python#8093
Since that has been resolved (by being able to specify the schema), I've created a separate issue. I also couldn't find issues related to REPEATED fields.
Mac OS X 10.14.5
Python 3.6.8
Packages:
google-api-core==1.14.2
google-auth==1.6.3
google-cloud-bigquery==1.19.0
google-cloud-core==1.0.3
google-cloud-iam==0.2.1
google-cloud-logging==1.12.1
google-resumable-media==0.3.3
googleapis-common-protos==1.6.0
Also:
JobConfig
doesn't change the error.import pandas as pd
from google.cloud import bigquery
PROJECT = "MY-PROJECT"
DATASET = "MY_DATASET"
TABLE = "MY_TABLE"
# My table schema
schema = [
bigquery.SchemaField("foo", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("bar", "FLOAT", mode="REPEATED"),
]
# Set everything up
client = bigquery.Client(PROJECT)
dataset_ref = client.dataset(DATASET)
table_ref = dataset_ref.table(TABLE)
# Delete the table if exists
print("Deleting table if exists...")
client.delete_table(table_ref, not_found_ok=True)
# Create the table
print("Creating table...")
table = bigquery.Table(table_ref, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY
)
table = client.create_table(table, exists_ok=True)
print("Table schema:")
print(table.schema)
print("Table partitioning:")
print(table.time_partitioning)
# Upload data to partition
table_partition = TABLE + "$20190522"
table_ref = dataset_ref.table(table_partition)
df = pd.DataFrame({"foo": [1, 2, 3], "bar": [[2.0, 3.0], [3.0, 4.0], [4.0, 5.0]]})
job_config = bigquery.LoadJobConfig(schema=schema)
client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
Traceback (most recent call last):
File "test.py", line 51, in <module>
client.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
File "google/cloud/bigquery/job.py", line 734, in result
return super(_AsyncJob, self).result(timeout=timeout)
File "google/api_core/future/polling.py", line 127, in result
raise self._exception
google.api_core.exceptions.BadRequest: 400 Error while reading data, error message:
Provided schema is not compatible with the file 'prod-scotty-******'.
Field 'bar' is specified as REPEATED in provided schema
which does not match REQUIRED as specified in the file.
Hello !
I am a new user of BigQuery python package. And I encounter a problem.
To simplify, I have a simple pandas DataFrame with a None value :
df = pd.DataFrame({'x': [1, 2, None, 4]})
For pandas, this will be a NaN and the dtype of column will be float64 by default. But I would like to push this DataFrame on BigQuery with an integer format for the column x. (the None will be a null)
Thanks to the new version of pandas (>= 0.24), we can change the type of the column and keep the NaN value :
df['x'] = df['x'].astype('Int64')
print(df.dtypes)
# Int64
But when I try to push this DataFrame to BigQuery, I encounter an ArrowTypeError :
ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column x with type Int64')
I find some solutions that allow me to do the stuff. But I always need to update my table after execute load_table_from_dataframe... I think it must have a better solution for doing this. Any ideas please ?
At the end, I would like to have this table in BigQuery :
Line | x |
1 | 1 |
2 | 2 |
3 | null |
4 | 4 |
with x as an INTEGER type.
import pandas as pd
from google.cloud import bigquery
print(pd.__version__)
print(bigquery.__version__)
df = pd.DataFrame({'x': [1, 2, None, 4]})
df['x'] = df['x'].astype('Int64')
print(df.dtypes)
client = bigquery.Client()
dataset_ref = client.dataset('test_dataset')
table_ref = dataset_ref.table('test')
client.load_table_from_dataframe(df, table_ref).result()
ArrowTypeError Traceback (most recent call last)
<ipython-input-4-fe43ea977e67> in <module>
14 table_ref = dataset_ref.table('test')
15
---> 16 client.load_table_from_dataframe(df, table_ref).result()
/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py in load_table_from_dataframe(self, dataframe, destination, num_retries, job_id, job_id_prefix, location, project, job_config)
1045 """
1046 buffer = six.BytesIO()
-> 1047 dataframe.to_parquet(buffer)
1048
1049 if job_config is None:
/usr/local/lib/python3.7/site-packages/pandas/core/frame.py in to_parquet(self, fname, engine, compression, index, partition_cols, **kwargs)
2201 to_parquet(self, fname, engine,
2202 compression=compression, index=index,
-> 2203 partition_cols=partition_cols, **kwargs)
2204
2205 @Substitution(header='Whether to print column labels, default True')
/usr/local/lib/python3.7/site-packages/pandas/io/parquet.py in to_parquet(df, path, engine, compression, index, partition_cols, **kwargs)
250 impl = get_engine(engine)
251 return impl.write(df, path, compression=compression, index=index,
--> 252 partition_cols=partition_cols, **kwargs)
253
254
/usr/local/lib/python3.7/site-packages/pandas/io/parquet.py in write(self, df, path, compression, coerce_timestamps, index, partition_cols, **kwargs)
111 else:
112 from_pandas_kwargs = {'preserve_index': index}
--> 113 table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
114 if partition_cols is not None:
115 self.api.parquet.write_to_dataset(
/usr/local/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.Table.from_pandas()
/usr/local/lib/python3.7/site-packages/pyarrow/pandas_compat.py in dataframe_to_arrays(df, schema, preserve_index, nthreads, columns, safe)
466 arrays = [convert_column(c, t)
467 for c, t in zip(columns_to_convert,
--> 468 convert_types)]
469 else:
470 from concurrent import futures
/usr/local/lib/python3.7/site-packages/pyarrow/pandas_compat.py in <listcomp>(.0)
465 if nthreads == 1:
466 arrays = [convert_column(c, t)
--> 467 for c, t in zip(columns_to_convert,
468 convert_types)]
469 else:
/usr/local/lib/python3.7/site-packages/pyarrow/pandas_compat.py in convert_column(col, ty)
461 e.args += ("Conversion failed for column {0!s} with type {1!s}"
462 .format(col.name, col.dtype),)
--> 463 raise e
464
465 if nthreads == 1:
/usr/local/lib/python3.7/site-packages/pyarrow/pandas_compat.py in convert_column(col, ty)
455 def convert_column(col, ty):
456 try:
--> 457 return pa.array(col, type=ty, from_pandas=True, safe=safe)
458 except (pa.ArrowInvalid,
459 pa.ArrowNotImplementedError,
/usr/local/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib.array()
/usr/local/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib._ndarray_to_array()
/usr/local/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib._ndarray_to_type()
/usr/local/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column x with type Int64')
Thank you :)
python --version
.pip --version
.google-cloud-bigquery
version: pip show google-cloud-bigquery
Name: google-cloud-bigquery
Version: 1.24.0
Summary: Google BigQuery API client library
Home-page: https://github.com/GoogleCloudPlatform/google-cloud-python
Author: Google LLC
Author-email: [email protected]
License: Apache 2.0
Location: /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages
Requires: protobuf, google-api-core, six, google-cloud-core, google-auth, google-resumable-media
Required-by:
I made the following change to
/Library/Python/3.7/site-packages/google/cloud/bigquery/Schema.py
##from six.moves import collections_abc
try:
from collections import abc as collections_abc
except ImportError: # Python 2.7
import collections as collections_abc
Is your feature request related to a problem? Please describe.
Some job failures are just due to current conditions and the job will succeed if started from the beginning. The problem is that this isn't a simple "retry" because the job configuration is mutable.
For example, BigQuery automatically populates a destination table if not set. In this case, the destination table should not be part of the retried request. In my opinion, this kind of logic is outside the scope of the client libraries, as it's not clear when destination table needs to be cleared just from the job resource.
Describe the solution you'd like
create_job
method that takes any JobConfig object.configuration
property to job classes.403 rateLimitExceeded
(possibly with resetting the destination table if it was a query job)Describe alternatives you've considered
Add .retry()
method to job classes. This is problematic, primarily because the configuration may have changed since the job was initially created, leading to unintended consequences or hard-to-debug failures.
Additional context
Add any other context or screenshots about the feature request here.
See customer request for a method to retry any job at googleapis/google-cloud-python#5555
See: googleapis/google-cloud-python#6136, and the TODO left in the code by @jba:
def result(self, timeout=None):
...
if self.state is None:
self._begin()
# TODO: modify PollingFuture so it can pass a retry argument to done().
return super(_AsyncJob, self).result(timeout=timeout)
Cloning this issue over from google-cloud-python
repo per the instructions at the end of the comment chain.
This issue still exists as far as I can tell - I may be wrong and simply am not doing it right, in which case would love some insight as to how this is done - comment chain was unresolved and there appear to be plenty of questions regarding the proper use of these params and how to set them - would love to at least to see an update to some documentation if this does actually work.
A quick recap and example of the issue, quoting a comment from previous thread:
Now I understand the difference, however I've tried all of these with no change in behavior.
bq._http.adapters['https://']._pool_connections = 200
bq._http.adapters['https://']._pool_maxsize = 200
bq._http.adapters['http://']._pool_maxsize = 200
bq._http.adapters['http://']._pool_connections = 200
bq._http_internal.adapters['https://']._pool_maxsize=100
bq._http_internal.adapters['https://']._pool_connections = 100
bq._http_internal.adapters['http://']._pool_maxsize = 100
bq._http_internal.adapters['http://']._pool_connections = 100
For example if I use 16 threads, I encounter the same warnings. Is there a different syntax I should be using here other than directly modifying these?
Thanks!
The support for python Bigquery API indicates that arrays are possible, however, when passing from a pandas dataframe to bigquery there is a pyarrow struct issue.
The only way round it seems its to drop columns then use JSON Normalise for a separate table.
from google.cloud import bigquery
project = 'lake'
client = bigquery.Client(credentials=credentials, project=project)
dataset_ref = client.dataset('XXX')
table_ref = dataset_ref.table('RAW_XXX')
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.write_disposition = 'WRITE_TRUNCATE'
client.load_table_from_dataframe(appended_data, table_ref,job_config=job_config).result()
This is the error recieved. NotImplementedError: struct
The reason I wanted to use this API as it indicates Nested Array support, which is perfect for our data lake in BQ but I assume this doesn't work?
Bigquery Api documentation of ExtractJobConfiguration which shows that source can be a sourceTable or sourceModel.
Related Java issue 202
/cc: @shollyman
I'm using pandas with google-cloud-python
my_df = get_sessions() # this return a dataframe has a column name is created which is datetime[ns] type ex :"2020-01-08 08:00:00"
my_df['created'] = pd.to_datetime(my_df['created'], format='%Y-%m-%d %H:%M:%S').astype('datetime64[ns]')
res = bigquery_client.client.load_table_from_dataframe(my_df, table_id)
res.result()
# exp: my value "2020-01-08 08:00:00" is being changed as INVALID or this value "0013-03-01T03:05:00" or other wrong value @plamut please help
I just updated my problem . Here
Thanks!
Hi
I would like to use Bigquery's magic commands to make a query that can create and store the result in a new Bigquery table.
I'm trying to use the following method :
%%bigquery --use_legacy_sql --project projectName --destination_table datasetname.tablename
But currently having this issue :
unrecognized arguments: --destination_table datasetname.tablename
This argument is mentioned in the Source code for google.cloud.bigquery.magics.
Thanks for your help !
OS: Linux dc32b7e8763a 4.9.0-6-amd64 googleapis/google-cloud-python#1 SMP Debian 4.9.82-1+deb9u3 (2018-03-02) x86_64 x86_64 x86_64 GNU/Linux
Python version: Python 2.7.6
google-cloud-bigquery: 1.8.0
We're getting flaky 400 BadRequest errors on our query jobs. We've been seeing this issue for a while on and off, but last night starting at around 7pm we saw a spike in these failures.
These errors are not caught by the default Retry objects because 400 usually signifies a malformed query or a missing table, rather than a transient error.
A fix might be to add a clause catching 400s with this exact error message to _should_retry at https://github.com/googleapis/google-cloud-python/blob/master/bigquery/google/cloud/bigquery/retry.py#L30 and/or RETRY_PREDICATE at https://github.com/googleapis/google-cloud-python/blob/master/api_core/google/api_core/future/polling.py#L32.
from google.api_core.future import polling
from google.cloud.bigquery import retry as bq_retry
query_job = self.gclient.query(query, job_config=config, retry=bq_retry.DEFAULT_RETRY.with_deadline(max_wait_secs))
query_job._retry = polling.DEFAULT_RETRY.with_deadline(max_wait_secs)
return query_job.result(timeout=max_wait_secs)
One example:
File "/opt/conda/lib/python2.7/site-packages/verily/bigquery_wrapper/bq.py", line 108, in _wait_for_job
return query_job.result(timeout=max_wait_secs)
File "/opt/conda/lib/python2.7/site-packages/google/cloud/bigquery/job.py", line 2762, in result
super(QueryJob, self).result(timeout=timeout)
File "/opt/conda/lib/python2.7/site-packages/google/cloud/bigquery/job.py", line 703, in result
return super(_AsyncJob, self).result(timeout=timeout)
File "/opt/conda/lib/python2.7/site-packages/google/api_core/future/polling.py", line 122, in result
self._blocking_poll(timeout=timeout)
File "/opt/conda/lib/python2.7/site-packages/google/cloud/bigquery/job.py", line 2736, in _blocking_poll
super(QueryJob, self)._blocking_poll(timeout=timeout)
File "/opt/conda/lib/python2.7/site-packages/google/api_core/future/polling.py", line 101, in _blocking_poll
retry_(self._done_or_raise)()
File "/opt/conda/lib/python2.7/site-packages/google/api_core/retry.py", line 270, in retry_wrapped_func
on_error=on_error,
File "/opt/conda/lib/python2.7/site-packages/google/api_core/retry.py", line 179, in retry_target
return target()
File "/opt/conda/lib/python2.7/site-packages/google/api_core/future/polling.py", line 80, in _done_or_raise
if not self.done():
File "/opt/conda/lib/python2.7/site-packages/google/cloud/bigquery/job.py", line 2723, in done
location=self.location,
File "/opt/conda/lib/python2.7/site-packages/google/cloud/bigquery/client.py", line 672, in _get_query_results
retry, method="GET", path=path, query_params=extra_params
File "/opt/conda/lib/python2.7/site-packages/google/cloud/bigquery/client.py", line 382, in _call_api
return call()
File "/opt/conda/lib/python2.7/site-packages/google/api_core/retry.py", line 270, in retry_wrapped_func
on_error=on_error,
File "/opt/conda/lib/python2.7/site-packages/google/api_core/retry.py", line 179, in retry_target
return target()
File "/opt/conda/lib/python2.7/site-packages/google/cloud/_http.py", line 319, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 GET https://www.googleapis.com/bigquery/v2/projects/packard-campbell-synth/queries/9bcea2cb-1747-4a1e-9ac8-e1de40f00d08?timeoutMs=10000&location=US&maxResults=0: The job encountered an internal error during execution and was unable to complete successfully.
After adding the new timeout
parameter to various public methods (#9987), we should demonstrate its usage in the code samples.
Users should be aware of this new feature, and should probably use it by default to avoid sporadic weird issues related to a method "getting stuck" at the transport layer.
I noticed that building the docs locally on master
fails due to the following error:
sphinx.errors.SphinxWarning: <workspace_dir>/python-bigquery/google/cloud/bigquery_v2/types.py:docstring of google.cloud.bigquery_v2.types.DeleteModelRequest.dataset_id:1:duplicate object description of google.cloud.bigquery_v2.types.DeleteModelRequest.dataset_id, other instance in gapic/v2/types, use :noindex: for one of them
Checking what gets injected into the types
module does not reveal anything, the DeleteModelRequest
type is not duplicated.
It turns out that the issue does not occur with the older version of Sphinx (< 3.x), thus it appears as a bug in the Sphinx package itself. A workaround is to pin that dependency to an older version.
Preferably, though, the new warnings emitted in Sphinx 3 should be addressed at their root causes.
I know BigQuery jobs are asynchronous by default. However, I am struggling to make my datapipeline async end-to-end.
Looking at this JS example, I thought it would be the most Pythonic to make a BigQuery job awaitable. However, I can't get that to work in Python i.e. errors when await client.query(query)
. Looking at the source code, I don't see which method returns an awaitable object.
I have little experience in writing async Python code and found this example that wraps jobs in a async def coroutine
.
class BQApi(object):
def __init__(self):
self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])
async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
job = self.api.query(query, **kwargs)
task = asyncio.create_task(self.coroutine_job(job))
return await task
@staticmethod
async def coroutine_job(job):
return job.result()
The google.api_core.operation.Operation
shows how to use add_done_callback
to asynchronously wait for long-running operations. I have tried that, but the following yields AttributeError: 'QueryJob' object has no attribute '_condition'
:
from concurrent.futures import ThreadPoolExecutor, as_completed
query1 = 'SELECT 1'
query2 = 'SELECT 2'
def my_callback(future):
result = future.result()
operations = [bq.query(query1), bq.query(query2)]
[operation.add_done_callback(my_callback) for operation in operations]
results2 = []
for future in as_completed(operations):
results2.append(list(future.result()))
Given that jobs are already asynchronous, would it make sense to add a method that returns an awaitable?
Or am I missing something and is there an Pythonic way to use the BigQuery client with the async/await pattern?
When converting rows to JSON, repeated schema fields are deepcopied causing noticeable slowdowns in complex schemas.
Since conversion is performed recursively, complex schemas containing multiple levels of repeated fields (e.g. repeated list of structs each containing multiple fields of repeated structs) are copied multiple times when converting a single row - causing even further slowdown due to redundant copying.
I've created a proposal for removing the deepcopy operation. Will reference relevant PR.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.