jeppe742 / deltalakereader Goto Github PK
View Code? Open in Web Editor NEWRead Delta tables without any Spark
License: Apache License 2.0
Read Delta tables without any Spark
License: Apache License 2.0
from deltalake import DeltaTable
df = DeltaTable("path_to_delta_table",file_system=fs).to_pandas()
In the latest release of 0.2.2, I have been trying to read a
delta table from S3 which only updates few rows. When I do a read on full delta table. The dataframe has both initial value and updated value. But, I only need the latest snapshot which is the latest update. Not all the updates ever done. Am I missing something? For validation, I verified by reading through Spark context and It returns only the latest snapshot. Any help?
For reference I have attached a snapshot of read from DeltaTable and read from Spark and the data frame has two and one row respectively.
I am using the reader for a project and it has worked for a few of my datasets, but I was getting back significantly less rows of data on my new dataset.
Upon investigation I found that when the checkpoint files reach 100MB they are sharded out into multiple files, but the reader has no logic for handling this.
This dataset works fine for us:
Upon investigating the code I believe this is where the bug is occuring:
While trying to load delta table getting shemaString key error at apply_partial_logs function. This issue due to some imperfections in checkpoint file like metadata doesn't have schemaString in one of the checkpoint file. I need to skip that file and load delta table. For this we need to add error handling functionality.
from deltalake import DeltaTable
df = DeltaTable("path_to_delta_table",file_system=fs).to_pandas()
I am using latest version of delta lake reader. I am reading a delta table using the above function. Recently I had updated my delta table by using merge schema and adding new columns to the delta table. Now, when I read the table after the merge I don't see new columns in the results. I tested this by reading through Spark Context and I see new columns.
spark.read.format("delta").load()
Any Help?
Thank You
Nitin
Decimal types are not currently supported, it would be great to include them
Seems like the self.schema is not getting defined in the init function. Would you please check the code and see how you would define its default value?
Hi @jeppe742 Would you mind if I make a PR copying the exact Delta-rs license file? I need the license in order to use this library internally in our firm.
Thanks!
Using from fsspec.implementations.hdfs import PyArrowHDFS
FileSystem and file_system = PyArrowHDFS("127.0.0.1", 9000, "root")
to init it.
Got errors :
Traceback (most recent call last): File "/hadoop/read.py", line 353, in <module> readData() File "/hadoop/read.py", line 344, in readData df1 = DeltaTable('/zy/example_table_path', file_system=file_system).to_table(filter=ds.field("id") >= 100).to_pandas() File "/hadoop/read.py", line 147, in __init__ self._as_newest_version() File "/hadoop/read.py", line 258, in _as_newest_version self._apply_partial_logs(version=self.checkpoint + 9) File "/hadoop/read.py", line 216, in _apply_partial_logs print(list(self.filesystem.open(log_file))) TypeError: 'HDFSFile' object is not iterable
Unfortunately, I cannot use the latest s3fs because the latest delta-lake-reader[aws]==0.2.14
requires s3fs < 2023
, and on s3fs==2022.11.0
, I am getting a known issue with s3fs
.
Why was that issue closed, I do not know, since it happened to lots of folks even to one of the latest versions, i.e. 2023.1.0
.
Also, I would like to specify that my lambdas have all the policy permissions set to all the s3 objects and buckets through the IAM Role.
Could by any chance be released an update which can use the latest s3fs==2023.10.0
, such I would know to address this as an s3fs
issue, please?
[ERROR] PermissionError: Forbidden
Traceback (most recent call last):
File "/var/task/inquire-data-set.py", line 118, in lambda_handler
dt = DeltaTable(s3_path, file_system=fs)
File "/var/lang/lib/python3.10/site-packages/deltalake/deltatable.py", line 40, in __init__
if not self._is_delta_table():
File "/var/lang/lib/python3.10/site-packages/deltalake/deltatable.py", line 62, in _is_delta_table
return self.filesystem.exists(f"{self.log_path}")
File "/var/lang/lib/python3.10/site-packages/fsspec/asyn.py", line 113, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/var/lang/lib/python3.10/site-packages/fsspec/asyn.py", line 98, in sync
raise return_result
File "/var/lang/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/var/lang/lib/python3.10/site-packages/s3fs/core.py", line 946, in _exists
await self._info(path, bucket, key, version_id=version_id)
File "/var/lang/lib/python3.10/site-packages/s3fs/core.py", line 1210, in _info
out = await self._call_s3(
File "/var/lang/lib/python3.10/site-packages/s3fs/core.py", line 339, in _call_s3
return await _error_wrapper(
File "/var/lang/lib/python3.10/site-packages/s3fs/core.py", line 139, in _error_wrapper
raise err
Hi,
I am facing an issue while reading delta tables from Azure Delta Lake using AzureBlobFileSystem in adlfs. I am not sure if its an issue with syntax or library versions. Kindly help!!
Libraries:
token_credential = ClientSecretCredential(active_directory_tenant_id, active_directory_client_id,
active_directory_client_secret)
fs = AzureBlobFileSystem(
account_name = storage_account,
credential = token_credential
)
dt = DeltaTable('containername/folder1/folder2/tablefolder', file_system=fs) #tablefolder contains the **_delta_log**
Error:
TypeError Traceback (most recent call last)
<ipython-input-24-35f4abc79a4b> in <module>
7 pathtosa = deltaLakeFinalUri + conatainerPathToDeltaTable + deltaTableName
8 print(pathtosa)
----> 9 dt = DeltaTable('dqdata/RAW/Revenue/actual_revenue_and_margin', file_system=fs)
10 df=dt.to_pandas()
~\AppData\Roaming\Python\Python38\site-packages\deltalake\deltatable.py in __init__(self, path, file_system)
43 Make sure you point to the root of a delta table"""
44 )
---> 45 self._as_newest_version()
46
47 # The PyArrow Dataset is exposed by a factory class,
~\AppData\Roaming\Python\Python38\site-packages\deltalake\deltatable.py in _as_newest_version(self)
149 # apply remaining versions. This can be a maximum of 9 versions.
150 # we will just break when we don't find any newer logs
--> 151 self._apply_partial_logs(version=self.checkpoint + 9)
152
153 def to_table(self, *args, **kwargs):
~\AppData\Roaming\Python\Python38\site-packages\deltalake\deltatable.py in _apply_partial_logs(self, version)
130 elif "metaData" in meta_data.keys():
131 schema_string = meta_data["metaData"]["schemaString"]
--> 132 self.schema = schema_from_string(schema_string)
133 # Stop if we have reatched the desired version
134 if self.version == version:
~\AppData\Roaming\Python\Python38\site-packages\deltalake\schema.py in schema_from_string(schema_string)
17 pa_type = map_type(type)
18
---> 19 fields.append(pa.field(name, pa_type, nullable=nullable, metadata=metadata))
20 return pa.schema(fields)
21
C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\types.pxi in pyarrow.lib.field()
C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\types.pxi in pyarrow.lib.ensure_metadata()
C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\types.pxi in pyarrow.lib.KeyValueMetadata.__init__()
C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\lib.cp38-win_amd64.pyd in string.from_py.__pyx_convert_string_from_py_std__in_string()
TypeError: expected bytes, int found
From version 3.3 cryptography switched from compiled wheels for each python version, to using the ABI3 format.
This, however, requires a very recent version of pip to work.
pip==20.0 seemed to do the trick for me
Otherwise, pip will fail to find a compiled wheel and try to compile it from source. This requires you to have OpenSSL installed
Azure Function - When trying to access a delta table on our Azure storage account, I get an error when I call DeltaTable class. Same code on PyCharm, no error.
I need HELP!! Am I missing something???
I have the following defined in the requirements.txt
azure-functions
azure-identity
pyodbc
delta-lake-reader[azure]
Requirement already satisfied: delta-lake-reader[azure] in c:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages (from -r requirements.txt (line 8)) (0.2.13)
def readDeltaTableSchema(container_name: str, schema_name: str, delta_table: str) -> str:
from deltalake import DeltaTable
from adlfs import AzureBlobFileSystem
VZNTDIRECTORYID = os.getenv('AZ_STORAGE_VZNTDIRECTORYID')
VZNTID = os.getenv('AZ_STORAGE_VZNTID')
VZNTSECRET = os.getenv('AZ_STORAGE_VZNTSECRET')
az_account_name = os.getenv('AZ_STORAGE_STARBURST_ACCT')
vznt_tenant_storage_account = os.getenv('AZ_STORAGE_STARBURST_ACCT')
Storage_URL = "https://{vznt_tenant_storage_account}.dfs.core.windows.net"
az_container = container_name
az_schema = schema_name
az_delta_table = delta_table
url = f"abfss://{az_container}@{az_account_name}.dfs.core.windows.net/delta/{az_schema}/{az_delta_table}"
fs = AzureBlobFileSystem(
account_name=az_account_name, account_url=Storage_URL,
client_id=VZNTID, client_secret=VZNTSECRET, tenant_id=VZNTDIRECTORYID
)
--> deltaTableSchemaMeta = DeltaTable(url, file_system=fs)
Error:
[2022-12-20T18:27:16.855Z] Executed 'Functions.register-data-sourceHTTPTrigger' (Failed, Id=bbfa069d-d7c9-475c-860b-3e593a0e0378, Duration=16026ms)
[2022-12-20T18:27:16.858Z] System.Private.CoreLib: Exception while executing function: Functions.register-data-sourceHTTPTrigger. System.Private.CoreLib: Result: Failure
Exception: HttpResponseError: Operation returned an invalid status 'The specifed resource name contains invalid characters.'
ErrorCode:InvalidResourceName
Stack: File "C:\ProgramData\chocolatey\lib\azure-functions-core-tools-3\tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 402, in _handle__invocation_request
call_result = await self.loop.run_in_executor(
File "C:\Program Files\Python39\lib\concurrent\futures\thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "C:\ProgramData\chocolatey\lib\azure-functions-core-tools-3\tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 606, in run_sync_func
return ExtensionManager.get_sync_invocation_wrapper(context,
File "C:\ProgramData\chocolatey\lib\azure-functions-core-tools-3\tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\extension.py", line 215, in raw_invocation_wrapper
result = function(**args)
File "C:\working-folder\az-func\http-register-dta-source\register-data-sourceHTTPTrigger_init.py", line 30, in main
storage_access()
File "C:\working-folder\az-func\http-register-dta-source\register-data-sourceHTTPTrigger_init.py", line 62, in storage_access
val = readDeltaTableSchema(az_container, az_schema, az_delta_table)
File "C:\working-folder\az-func\http-register-dta-source\register-data-sourceHTTPTrigger_init.py", line 91, in readDeltaTableSchema
deltaTableSchemaMeta = DeltaTable(url, file_system=fs)
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\deltalake\deltatable.py", line 40, in init
if not self._is_delta_table():
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\deltalake\deltatable.py", line 62, in _is_delta_table
return self.filesystem.exists(f"{self.log_path}")
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\adlfs\spec.py", line 1292, in exists
return sync(self.loop, self._exists, path)
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\fsspec\asyn.py", line 71, in sync
raise return_result
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\fsspec\asyn.py", line 25, in _runner
result[0] = await coro
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\adlfs\spec.py", line 1314, in _exists
if await bc.exists():
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\azure\core\tracing\decorator_async.py", line 79, in wrapper_use_tracer
return await func(*args, **kwargs)
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\azure\storage\blob\aio_blob_client_async.py", line 652, in exists
process_storage_error(error)
File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\azure\storage\blob_shared\response_handlers.py", line 185, in process_storage_error
exec("raise error from None") # pylint: disable=exec-used # nosec
File "", line 1, in
.
See:
My project depends on delta-lake-reader, and I cannot upgrade pyarrow to a version which mitigates the above vulnerability:
ERROR: Cannot install delta-lake-reader==0.2.15 and pyarrow==14.0.1 because these package versions have conflicting dependencies.
The conflict is caused by:
The user requested pyarrow==14.0.1
delta-lake-reader 0.2.15 depends on pyarrow<10.0.0 and >=9.0.0
I am trying to host an app online that has to read blobs with deltalake format from azure blobstorage. To do this I use:
AzureBlobFileSystem from adlfs
DeltaTable from deltalake.
When hosting the website, (with azure App service) everything works well: When given input, the app reads the data from azure blob storage and returns the correct tables.
The problem occurs when the app has been online and unused for several hours: When the app tries to read from the deltalake storage it fails and raises an HttpResponseError with a traceback that is not clear enough for me to understand what the actual issue is. When I restart the app, everything works fine again untill you wait for a few hours: The same bug keeps returning.
I am not sure if the problem occurs in the adlfs package or the deltalake package. I hope that someone here can help to understand where it is failing and why and knows a solution to the problem!
Thanks in advance
HttpResponseError: The range specified is invalid for the current size of the resource.
RequestId:2988ea13-601e-0083-40f7-47b68d000000 Time:2022-04-04T07:42:05.9077599Z
ErrorCode:InvalidRange Content: <?xml version="1.0"
encoding="utf-8"?><Error><Code>InvalidRange</Code><Message>
The range specified is invalid for the current size of the resource.
RequestId:2988ea13-601e-0083-40f7-47b68d000000 Time:2022-04-04T07:42:05.9077599Z</Message></Error>
Traceback:
File "/opt/venv/lib/python3.9/site-packages/streamlit/script_runner.py", line 379, in _run_script
exec(code, module.__dict__)
File "/app/src/incasso/dashboard_form.py", line 95, in <module>
eenheid = find_eenheidnum(deelcontractnummer, fs).iloc[0]
File "/opt/venv/lib/python3.9/site-packages/sklego/pandas_utils.py", line 81, in wrapper
result = func(*args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/incasso/dashboard_functions.py", line 29, in find_eenheidnum
DeltaTable("20-silver/edh/woc_contracten", file_system=fs)
File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 45, in __init__
self._as_newest_version()
File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 151, in _as_newest_version
self._apply_partial_logs(version=self.checkpoint + 9)
File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 113, in _apply_partial_logs
for line in log:
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1616, in __next__
out = self.readline()
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1613, in readline
return self.readuntil(b"\n")
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1596, in readuntil
part = self.read(blocks or self.blocksize)
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1565, in read
out = self.cache._fetch(self.loc, self.loc + length)
File "/opt/venv/lib/python3.9/site-packages/fsspec/caching.py", line 154, in _fetch
self.cache = self.fetcher(start, end) # new block replaces old
File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 91, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 71, in sync
raise return_result
File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 25, in _runner
result[0] = await coro
File "/opt/venv/lib/python3.9/site-packages/adlfs/spec.py", line 1804, in _async_fetch_range
stream = await self.container_client.download_blob(
File "/opt/venv/lib/python3.9/site-packages/azure/core/tracing/decorator_async.py", line 74, in wrapper_use_tracer
return await func(*args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_container_client_async.py", line 1000, in download_blob
return await blob_client.download_blob(
File "/opt/venv/lib/python3.9/site-packages/azure/core/tracing/decorator_async.py", line 74, in wrapper_use_tracer
return await func(*args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 480, in download_blob
await downloader._setup() # pylint: disable=protected-access
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_download_async.py", line 250, in _setup
self._response = await self._initial_request()
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_download_async.py", line 336, in _initial_request
process_storage_error(error)
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/_shared/response_handlers.py", line 181, in process_storage_error
exec("raise error from None") # pylint: disable=exec-used # nosec
File "<string>", line 1, in <module>
Hi,
My Databricks pipeline is writing data to Delta share on hourly basis. I am reading this from Azure function app python lib. However the function app is not reading the lastest data instead keeps referring to older version and when I list the files it does not list all parquets in the storage.
fs = AzureBlobFileSystem(account_name=account_name, credential=account_key)
dt = DeltaTable(azurepath, file_system=fs)
print(dt.version)
print(dt.files)
Is there any setting I need to refer to latest version data?
Regards,
Sripathi
from deltalake import DeltaTable
from s3fs import S3FileSystem
fs = S3FileSystem() # authenticate using environment variables, in this example
df = DeltaTable(f"s3://{DATA_BUCKET}/{RES_PATH}", file_system=fs)
res = df.to_table(filter= (ds.field("s_uid") == str(s_uid)) & (ds.field("r_uid") == str(r_uid))).to_pandas()
I am reading a delta table from AWS S3 using above function.
and I did three writes to the delta table;
Now, When I read a delta table it only shows 4 columns, instead it should show all the 5 columns.
Any help ?
When installing azure, aws og gcp optional dependencies, the installation fails to build the wheel for aiohttp, multidict, yarl and some others.
This is due to an older version of pip.
Update pip using pip install -U pip
before installing to fix this.
pip >=20 is also set as a dependency, so running the installation twice, should also fix any issues related to old pip versions.
In #39, a new piece of code was added to raise error when _last_checkpoint
is not present.
But since that change, I can't read any delta version before at least 10 versions of the table are added as the _last_checkpoint
file is only created after at least 10 versions.
Would it better to instead look for *.json or some other file to validate if it's a delta table?
I have a data set that I unfortunately can't share, however its hosted on S3.
I can load the data in using
delta_table_path = 's3://my/delta/path'
df = DeltaTable(delta_table_path, file_system=fs).to_pandas()
this comes across with the correct column names, and seemingly the correct row count, however all of the data int the dataset is null which is not the case because we do have this data picked up in spark and generating output tables
Sorry for the vague response, I'm just looking for some advice or if this is a known issue
Hi @jeppe742 , when querying delta lake with the command below. I am getting NaN values for random rows in the data frame. There is no specific pattern for the data to show NaN. Querying the same data with pyspark has no issues.
df = DeltaTable('path/to/table',file_system=fs).to_table(columns=["col1","col2",...],filter=(ds.field("year")==datetime.now().year) & (ds.field("month")==datetime.now().month) & (ds.field("day")==str(datetime.now().day))).to_pandas()
Please advice
The function _apply_partial_logs
splits multiple json lines on whitespace characters. But when a whitespace character occurs in a string field, it splits the log into invalid json parts breaking the parsing of the log.
Example value that will break the parser (this is the raw string with escape characters inluded):
b'{"commitInfo":{"timestamp":1614330479493,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[\"Feature\",\"Period\"]","predicate":"Feature = 'xxx' AND Period = '1'"},"readVersion":49,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"667","numOutputRows":"2"}}}\n{"add":{"path":"Feature=xxx/Period=1/part-00000-b6191164-a8d8-4b53-9b6b-fb04a55bb5d8.c000.snappy.parquet","partitionValues":{"Feature":"xxx","Period":"1"},"size":667,"modificationTime":1614330479064,"dataChange":true}}\n'
Splitting on newline only ( \n ) would solve this issue but I don't know if that will have undesired side effects. I will see what I can do :).
Prior to commit:
Traceback (most recent call last):
File "/Users/stevenrojano/Desktop/DeltaLakeReader/main.py", line 15, in
main()
File "/Users/stevenrojano/Desktop/DeltaLakeReader/main.py", line 10, in main
delta_table = DeltaTable(delta_table_path, file_system=fs)
File "/Users/stevenrojano/Desktop/DeltaLakeReader/deltalake/deltatable.py", line 45, in init
self._as_newest_version()
File "/Users/stevenrojano/Desktop/DeltaLakeReader/deltalake/deltatable.py", line 144, in _as_newest_version
self._apply_from_checkpoint(checkpoint_info["version"])
File "/Users/stevenrojano/Desktop/DeltaLakeReader/deltalake/deltatable.py", line 87, in _apply_from_checkpoint
self.schema = schema_from_string(schema_string)
File "/Users/stevenrojano/Desktop/DeltaLakeReader/deltalake/schema.py", line 17, in schema_from_string
fields.append(pa.field(name, pa_type, nullable=nullable, metadata=metadata))
File "pyarrow/types.pxi", line 1759, in pyarrow.lib.field
File "pyarrow/types.pxi", line 1013, in pyarrow.lib.ensure_metadata
File "pyarrow/types.pxi", line 915, in pyarrow.lib.KeyValueMetadata.init
File "stringsource", line 15, in string.from_py.__pyx_convert_string_from_py_std__in_string
TypeError: expected bytes, int found
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.