paillave / etl.net Goto Github PK
View Code? Open in Web Editor NEWMass processing data with a complete ETL for .net developers
Home Page: https://paillave.github.io/Etl.Net/
License: MIT License
Mass processing data with a complete ETL for .net developers
Home Page: https://paillave.github.io/Etl.Net/
License: MIT License
Originally posted by LordBigDuck November 10, 2022
Hello, I have gone through the documentation and source code but didn't manage to write running code to read XML file. Could you provide some samples ?
My user uploads a .csv file and I parse it with Etl.Net. Inside the Do section, I am using a validation package with specialized rule set for the data. However when the edited version of the same file is uploaded the results seem to be from the previous file version. I think that the file is being kept in memory. Is there a way with Etl.Net to dispose the file after the Do section? Thank you!
return StreamProcessRunner.CreateAndExecuteAsync(file,
t => t
.Select("Use uploaded file", i => FileValue.Create(i.OpenReadStream(), i.FileName, "Posted to API"))
.CrossApplyTextFile("parse file", FlatFileDefinition.Create(c => new AddMerchantUploadFileModel()
{
.... // Cut out for privacy \n\r
}).IsColumnSeparated(','))
.Do("validate", f => ValidateMerchantFile(f, addedMerchants, errors))
);```
Hi,
I'm trying to use the SimpleConsoleExecutionDisplay
.
The documentation use a parameterless constructor but the class has actually no default ctor.
Passing default
leads to NullReferenceException
. Passing new()
for each params seems ok, but I don't know what is expected.
Moreover the doc specify to use the Initialize()
method with a structure
variable (of type JobDefinitionStructure
), but I've no idea where to get that "structure" ?
I've passed the StreamProcessRunner.GetDefinitionStructure(), is it correct ?
Thank you by advance :)
Hi. The library looks great.
Is it possible to provide FileStream by myself (from ASP.Net core Action) and process it?
And is it possible to parse JSON files?
By the way, are there any pitfalls in using this library directly in the ASP.Net core web app with EF core?
When trying to use the SFTP provider to get files, I ran into a possible bug - the SftpFileValue
uses Path.Combine
in the GetContent
method to pass a filepath to the SFTP server, however if there's no trailing path delimiter ie, Path.Combine("SomeRoot/content", "filename.txt")
then Path.Combine
assumes the default for the machine it is running on. This can lead to a Linux SFTP server receiving a filepath with mixed forward and back slashes, and Renci.SshNet spits out a 'No such file' exception. That sent me down a lot of wrong turns since I assumed there was something wrong with the SFTP server or the authentication since the file clearly existed.
There is an easy workaround - make sure that there are trailing slashes in the path. It feels like Path.Combine
is redundant in the SftpFileValue
then because it's just concatenating the strings together at that point, and the error message as it stands is not helpful in trying to track down the issue (especially since the filepath isn't included in the exception, and the bits of path on their own seem sensible when passing them in - indeed, they work to get a list of the files at that location!)
Preferably I'm looking for a tool similar to Dagster, Prefect, or Airflow, which are popular tools in the python ecosystem that allow users to express data pipelines in the form of DAGs and have them be recurring, containerizable etc. Are there plans to expose that sort of functionality in this project? Thanks.
Hi all,
I've been using the EfCoreSave
operator with SaveMode.EntityFrameworkCore
option enabled as I'm planning to write my data to Postgres.
I've noticed when dealing with large streams, this operator creates a bottleneck for two reasons:
I've changed a local copy to look as follows:
var ret = args.SourceStream.Observable
.Chunk(args.BatchSize)
.Map(i => i.Select(j => (Input: j, Entity: args.GetEntity(j))).ToList())
.Do(i =>
{
var dbContextFactory = args.KeyedConnection == null
? this.ExecutionContext.DependencyResolver.Resolve<IDbContextFactory<TMyContext>>()
: this.ExecutionContext.DependencyResolver.Resolve<IDbContextFactory<TMyContext>>(args.KeyedConnection);
using (var dbContext = dbContextFactory.CreateDbContext())
{
ProcessBatch(i, dbContext, args.BulkLoadMode);
}
})
This allows multiple streams to write to the database at the same time, and crucially, disposes the DbContext after each operation (considerably improving performance/memory overhead in the change tracker).
On my sample of 500k+ entries the import is now silky smooth.
Was there a deliberate design decision behind this?
Hi
Oracle provider is supported?
Originally posted by franks1 August 9, 2022
I have gone through the sample and tutorial of your extraordinary Library (Etl.Net). I would like you to assist me with a sample code on reading Excel Files. Thank you
Hi,
I'm using the library as shown in the samples:
contextStream.CrossApply("Get Files", new FtpFileValueProvider("SRC", "Solar exports", "files from ftp", connectionParams, providerParams))
.Do("print files to console", i => Console.WriteLine(i.Name));
File names are printed correctly, but I'm always getting an NotImplementedException at the end of the execution.
It seems to be linked to the FileValueProviderBase.TypeName that explicitly throws a NotImplementedException.
The following change attempt has fixed the issue in my local copy:
namespace Paillave.Etl.Core
{
public abstract class FileValueProviderBase<TConnectionParameters, TProviderParameters> : IFileValueProvider, IValuesProvider<object, IFileValue>
{
[...]
public string TypeName => this.GetType().Name; // throw new NotImplementedException()
Am I using the ftp provider a wrong way or should it be fixed?
Thank you by advance,
Nicolas
I am trying to read data from one sql server db and insert into another. Is this supported ? wondering how can i specify a connection to the sqlserversave method?
Thanks in advance.
Hi again :-)
It seems that an issue is hidding in the Substract operator on non ordered streams.
Considering the following example:
var stream1 = contextStream
.CrossApply("create values from enumeration", ctx => Enumerable.Range(1, 100)
.Select(i => new { Id = i, Label = $"Label{i}" }));
var stream2 = contextStream
.CrossApply("create values from enumeration2", ctx => Enumerable.Range(1, 8)
.Select(i => new { Id = i, Label = $"OtherLabel{i}" }));
var res = stream1.Substract("merge with stream 2", stream2, i => i.Id, i => i.Id)
.Do("print console", i => Console.WriteLine(i.Label));
The previous code produces only 1 row (Label100
) instead of 92 rows.
No issue when using ordered streams and the method
public static IStream<TInLeft> Substract<TInLeft, TInRight, TKey>(this ISortedStream<TInLeft, TKey> leftStream, string name, ISortedStream<TInRight, TKey> rightStream)
.
(version 2.0.23)
Is there a way to specify the SQL command execution timeout for .CrossApplySqlServerQuery
and .ToSqlCommand
?
I'm trying to run a stored proc using .CrossApplySqlServerQuery
and consume the result set that it returns and it fails me with a timeout.
As I found out by googling, default CommandTimeout
for SqlConnection
and OdbcConnection
is 30 seconds which is not enough.
Is there a way to go around this? How do you solve this in your production environments, because I suspect not all queries/stored proc calls can be completed in under 30 seconds.
Is there a way to use other db connector except EFCore and SQLClient to access databases? As we have issue using any of the implemented due to the issue, that database we are targeting is SQL 2000.
Can you execute the ExecuteNonQueryAsync
in a transaction when it is provided by the database?
Reason
I try to execute the Delete(EFCore) operator within a transaction scope, but it throws an error, due to this command.
Setting its Transaction
property based on the Database should fix this.
Originally posted by @TomatorCZ in #292 (comment)
Hello, nice package you have here for ETL in .NET!
I have a question regarding database streams:
Suppose I need to define two streams for two different tables from the same ODBC database connection:
var arch1 = contextStream
.CrossApplySqlServerQuery("first query from source 1", o => o
.FromQuery("select * from dbo.carr")
.WithMapping(i => new
{
carr_code = i.ToColumn("carr_code"),
carr_name = i.ToColumn("carr_name")
})
, "source1")
.Select("create row to save source1 from first query", i => new { i.carr_name, i.carr_code });
var arch2 = contextStream
.CrossApplySqlServerQuery("second query from source 1", o => o
.FromQuery("select * from dbo.carr_old")
.WithMapping(i => new
{
carr_code_old = i.ToColumn("carr_code"),
carr_name_old = i.ToColumn("carr_name")
})
, "source1")
.Select("create row to save source1 from second query", i => new { i.carr_name_old, i.carr_code_old });
When executing this I get this error from the ODBC driver:
Unhandled exception. Paillave.Etl.Core.JobExecutionException: Job execution failed
---> System.Data.Odbc.OdbcException (0x80131937): ERROR [HY000] [Microsoft][ODBC SQL Server Driver]Connection is busy with results for another hstmt
This implies that the cursor for the connection has to be looped through to completion before being able to get another stream. Is there any way that this could be remedied? The source database is an old SQL Server 2000, so we're forced to use ODBC.
Thank you.
BTW, it works if I output one stream to a file and then define a new stream sequentially like this:
var arch = contextStream
.CrossApplySqlServerQuery("query from source 1", o => o
.FromQuery("select * from dbo.carr")
.WithMapping(i => new
{
carr_code = i.ToColumn("carr_code"),
carr_name = i.ToColumn("carr_name")
})
, "source1")
.Select("create row to save source1", i => new { i.carr_name, i.carr_code })
.ToTextFileValue("to file for source1", @"C:\temp\carr_out_arch.csv",
FlatFileDefinition.Create(f => new { carr_name = f.ToColumn("Name"), carr_code = f.ToColumn("Code") })
.IsColumnSeparated('|'))
.WriteToFile("save to source1 output file", i => i.Name)
.CrossApplySqlServerQuery("query from source 2", o => o
.FromQuery("select * from dbo.carr_old")
.WithMapping(i => new
{
carr_code_old = i.ToColumn("carr_code"),
carr_name_old = i.ToColumn("carr_name")
})
, "source1")
.Select("create row to save source2", i => new { i.carr_name_old, i.carr_code_old })
.ToTextFileValue("to file for source2", @"C:\temp\carr_out_arch2.csv",
FlatFileDefinition.Create(f => new { carr_name_old = f.ToColumn("Name"), carr_code_old = f.ToColumn("Code") })
.IsColumnSeparated('|'))
.WriteToFile("save to source2 output file", i => i.Name);
Hi, I am attempting to use SqlServerSave
however it fails for me because I believe internally it is not enclosing each property/column name in square brackets. I have a column called "Key" which is also a special sql server keyword which must be enclosed in brackets (i.e. [Key]) otherwise the generated sql statement fails.
Hi again :-)
Sorry bothering you so often these last few days, but I've found another glitch, along with its workaround.
I'm only trying to help 😳
I use EF core to save records in a Postgresql DB .WithMode(SaveMode.EntityFrameworkCore));
.
My own DataContext
derives from Microsoft.EntityFrameworkCore.DbContext
.
If I try to inject my datacontext without explicitly casting to DbContext
, I get an ArgumentNullException
with the following message : key Value cannot be null. (Parameter 'key')
The issue is thrown by Paillave.Etl.EntityFrameworkCore.EfCoreSaveStreamNode<TInEf, TIn, TOut>.CreateOutputStream:
protected override IStream<TOut> CreateOutputStream(EfCoreSaveArgs<TInEf, TIn, TOut> args)
{
var ret = args.SourceStream.Observable
.Chunk(args.BatchSize)
.Map(i => i.Select(j => (Input: j, Entity: args.GetEntity(j))).ToList())
.Do(i =>
{
/*** Thrown by the following lines => ***/
var dbContext = args.KeyedConnection == null
? this.ExecutionContext.DependencyResolver.Resolve<DbContext>()
: this.ExecutionContext.DependencyResolver.Resolve<DbContext>(args.KeyedConnection);
this.ExecutionContext.InvokeInDedicatedThreadAsync(dbContext, () => ProcessBatch(i, dbContext, args.BulkLoadMode)).Wait();
})
.FlatMap((i, ct) => PushObservable.FromEnumerable(i, ct))
.Map(i => args.GetOutput(i.Input, i.Entity));
return base.CreateUnsortedStream(ret);
}
Downcasting explicitly to DbContext when injecting fix this issue:
public class DataContext : DbContext
{
[...]
}
[...]
using var dc = new DataContext();
var options = new ExecutionOptions<string>
{
// This works
Resolver = new SimpleDependencyResolver().Register<DbContext>(dc),
// This doesn't work
Resolver = new SimpleDependencyResolver().Register(dc)
};
var res = await processRunner.ExecuteAsync("", options);
Originally posted by eisbaer66 August 21, 2022
Can is use async/await inside of .Select(...)
, .CrossApply(...)
, .Do(...)
?
Do i have to call .Result
/.Wait()
myself? Is this safe?
for example calling a REST api and reading the content as string:
stream.Select("get content", async context => await context.httpClient.GetStringAsync(context.url));
```</div>
hi @paillave ,do you hava plan to support hl7 ?
Originally posted by mlholt03 October 17, 2022
If I am access a database using this tool, and I wish to write unit tests for the code I am writing, how can I mock out the database access such that my unit test is not actually attempting to connect to a database? Is there some way to provide a SqlCommandValueProvider
instance that simply returns a basic sample set of data without querying a database?
I tried to use ToExcelFile, but I'm not sure how to use it(throw an error)
Hi,
is there a way how to use the select using batches without loading all data at once from a server into memory?
Originally posted by jignesh-dalal June 9, 2023
Input Stream 1:
ID,Name
1,Danny
2,Fred
3,Sam
Input Stream 2:
ID,name
1,Danny
3,Pamela
4,Fernando
Output:
ID,name,Status
1,Danny,Unchanged
2,Fred,Deleted
3,Pamela,Changed
4,Fernando,New
Can this be achieved using Etl.Net?
Could you provide an example of ETL from an uploaded file to a Web API? Let's say the file was a .csv file. I do not see how to do this within your documentation.
Thank you very much!
Hello,
is there a way to get an in memory list as an instantiated IList<T>
from SQL Server stream and have the database cursor consumed/closed?
First I thought extension method ToList()
is for this purpose, but it appears it returns an ISingleStream<List<T>>
var inMemoryCarrierList = contextStream
.CrossApplySqlServerQuery("carrier stream", builder => builder
.FromQuery("select carr_code, carr_name from dbo.carr_old order by carr_code")
.WithMapping(m => new
{
carr_code = m.ToColumn("carr_code"),
carr_name = m.ToColumn("carr_name")
}), "source1")
.ToList("in memory carrier list");
If I get a list this way and I don't consume this stream by, say, outputting to a text file and then I try a query like this on the same ODBC connection:
var arch1 = contextStream
.CrossApplySqlServerQuery("first query from source 1", o => o
.FromQuery("select * from dbo.carr")
.WithMapping(i => new
{
carr_code = i.ToColumn("carr_code"),
carr_name = i.ToColumn("carr_name")
})
, "source1")
.Select("create row to save source1 from first query", i => new { i.carr_name, i.carr_code });
I get error:
Unhandled exception. Paillave.Etl.Core.JobExecutionException: Job execution failed
---> System.Data.Odbc.OdbcException (0x80131937): ERROR [HY000] [Microsoft][ODBC SQL Server Driver]Connection is busy with results for another hstmt
Originally posted by lvjoemcintyre July 20, 2022
Are there plans for mysql support?
I started trying to modifiy the sqlserver project to support mysql.
I'm bumping into some trouble understanding how to debug it.
.SqlServerSave() currently generates sql query for INSERTs/UPDATEs using OUTPUT INSERTED.*
clause which is not supported by SQL Server versions prior to 2005.
I know that this is legacy stuff, but ETL.NET would be very useful for migrating necessary scenarios from ancient versions like SQL Server 2000.
Could the query be adjusted? It would suffice to execution an additional SELECT query like SELECT * FROM <table> WHERE col1=@val1, col2=@val2, <...> coln=@valn
to achieve the behaviour of OUTPUT INSERTED.*
. I guess positional arguments would need to be used as well here to support connections from ODBC/OLE DB drivers.
This could probably be implemented by creating a new node, e.g. SqlServer2000SaveStreamNode.cs
and adding a new extension method, say .ToSqlServer2000()
.
Do you currently allow for spatial joins / lookups as part of the ETL process pipeline? And if not, are you considering support for it?
How can we handle if the pipe sepeated csv is having a text column with multiline text. Text delimitter is ".
With the given example in the document, it is throwing error "could not deserialize value" because of the new line in the text column
Hi, can you read data from a source db and copy it to another, target db? I've read and tried the examples in the documentation, but read and save always happen inside the same database even though the documentation mentions 2 args parameters.
Please provide example code if it's possible.
.ToTextFileValue extension method fails when result output is bigger than 2GBs. We worked around with .Chunk() and then .Do(), but due to this lost elegancy of using file definition.
Is there a more elegant way to resolve this? As for ETL processes it is important to have possibility extract big data files.
Reviewed documentation on calling stored procedures and SQL commands, and I would like to know:
ToSqlCommand always returns the input events as is.
;ToSqlCommand
has variable declarations like:DECLARE @typeVar Char(1) = 'A';
SELECT * FROM dbo.myTable WHERE type = @typeVar;
How we could use @ within, isn't it reserved for parameter injection from upper stream? Checked the code, and its a bit strange to use @ for params, as @ is used in MS SQL for variable definitions and @@ for system variables as well.
There is a problem with generating SQL, since ODBC and OLE DB connections do not support named parameters in INSERT/UPDATE queries.
The parameters need to be replaced with ? symbols in the queries.
I've raised a sample PR showing a way how this can be remedied. Could we have something like this: https://github.com/paillave/Etl.Net/pull/442/
MySQL PostgreSQL SQLite support
Let's say we have a data pipeline like this:
var stream = contextStream
.ToSqlCommand("Create tables", Queries.CreateTables)
.CrossApplyFolderFiles("Get data file", "data.csv", true)
.CrossApplyTextFile("Parse data", FlatFileDefinition.Create(
i => new
{
code = i.ToColumn(0),
name = i.ToColumn(1)
}).IsColumnSeparated(','))
.SqlServerSave("Populate data table", o => o
.SeekOn(i => code)
.ToTable("[dbo].[some_table]")
)
.ToSqlCommand("Run a stored procedure", Queries.RunStoredProcedure);
A CSV is parsed, its records are saved to a table on a SQL Server database and then I want to run a stored procedure which depends on that data.
If I try to run it and the CSV file contains 5 rows, stored procedure is being run 5 times, and I only need to run it once.
I tried to create a second data pipeline to run the stored procedure like this:
var stream = contextStream
.ToSqlCommand("Create tables", Queries.CreateTables)
.CrossApplyFolderFiles("Get data file", "data.csv", true)
.CrossApplyTextFile("Parse data", FlatFileDefinition.Create(
i => new
{
code = i.ToColumn(0),
name = i.ToColumn(1)
}).IsColumnSeparated(','))
.SqlServerSave("Populate data table", o => o
.SeekOn(i => code)
.ToTable("[dbo].[some_table]")
);
contextStream
.ToSqlCommand("Run a stored procedure", Queries.RunStoredProcedure);
but when I try to execute it, stored procedure is not being run after whole CSV import is complete and rows are stored in the table, it's being run somewhere in between and the stored procedure can't get the data it needs since it has not been saved yet.
How can I signal to the pipeline, that I need to run the stored procedure just once?
I tried to create a separate DefineProcess method for running SP, but that seems really clumsy.
What would you advice?
Thank you.
Hi,
Is there a way how to make transaction processing in SQL ?
Is there a way how to skip failed rows which were failed and continue with inserting next rows ?
in my case, the source CSV data file and header content/types change often -- basically unknown, while parsing or before parsing I need to first Discover the Data Types of the data in the cols.
I just need to load & save the data into a Sql table based on whatever data & DataTypes there are in the CSV file. Can I auto load the CSV data and datatypes into the DB automatically with ETL box. I saw your expando example but was a little lost how to achieve a high level task
For e.g high level algorithm, how would I achieve this in your lib.
// Scan first 9 rows to discover the datatypes in the CSV file cols
var etlPrasedFile = ETL.Paillave.Open/parseCSV(someUnknownData.csv).ScanRows(9)
foreach (var col in etlPrasedFile )
new dataTable.addColumn(etlParsedFile.GetNextHeaderCol)
CreateNewSqlServerTable
LoadDatatoSqlTable??
thanks
hello, im working on a synchronisation between 2 database.
in the DefineProcess i read data from EF and insert it with SqlServerSave method.
if i have only one SqlServerSave in the DefineProcess, it works fine.
Howeever, if i try to update more than one table with multiple SqlServerSave (from different streams)
it throw exception like
There is already an open DataReader associated with this Command which must be closed first
Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding.
after some research on the exception, i end up trying to set MultipleActiveResultSets=true on the connection string that i'm doing the upserts on.
With the MARS attribute, now it works fine with multiple SqlServerSave.
Is there a way to achive the same thing without setting this attribute ?
Or is it supposed to works only with the MARS attribute on ?
thank you
Originally posted by ctrlaltdan May 16, 2023
Hi, I was wondering if there is any support for a Buffer operator (specifically mirroring the Buffer(timeSpan, count)
Rx operator.
Use case is to parse and load chunked output onto a service bus queue.
In the initial stage of our data processing pipeline, we verify the presence of a predefined set of files. If any of these files are missing, we need to raise an error. Is there a recommended practice for triggering errors, such as invoking a specific method, or is raising an exception the sole approach for handling this situation? Thank you
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.