In the Distributions examples it mentions that return_delay
is used and omitted but this is missing from the actual code example.
The code example is currently:
from pyspark.sql.types import IntegerType
import dbldatagen as dg
row_count = 1000 * 100
testDataSpec = (dg.DataGenerator(spark, name="test_data_set1", rows=row_count,
partitions=4, randomSeedMethod='hash_fieldname',
verbose=True)
.withColumn("purchase_id", IntegerType(), minValue=1000000, maxValue=2000000)
.withColumn("product_code", IntegerType(), uniqueValues=10000, random=True)
.withColumn("purchase_date", "date",
data_range=dg.DateRange("2017-10-01 00:00:00",
"2018-10-06 11:55:00",
"days=3"),
random=True)
.withColumn("return_date", "date",
expr="date_add('purchase_date', cast(floor(rand() * 100 + 1) as int))")
)
dfTestData = testDataSpec.build()
I tried the following 2 examples that did not work but not sure if they are what the examples was trying to show:
Example 1:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import expr
import dbldatagen as dg
row_count = 1000 * 100
testDataSpec = (dg.DataGenerator(spark, name="test_data_set1", rows=row_count,
partitions=4, randomSeedMethod='hash_fieldname',
verbose=True)
.withColumn("purchase_id", IntegerType(), minValue=1000000, maxValue=2000000)
.withColumn("product_code", IntegerType(), uniqueValues=10000, random=True)
.withColumn("purchase_date", "date",
data_range=dg.DateRange("2017-10-01 00:00:00",
"2018-10-06 11:55:00",
"days=3"),
random=True)
.withColumn("return_delay", IntegerType(), values=[-1, -2, -3], weights=[9, 2, 1],
random=True, omit=True)
.withColumn("return_date", "date", expr="date_add('purchase_date', 'return_delay')")
)
dfTestData = testDataSpec.build()
display(dfTestData)
Error 1:
AnalysisException: The second argument of 'date_add' function needs to be an integer.
Example 2:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import expr
import dbldatagen as dg
row_count = 1000 * 100
testDataSpec = (dg.DataGenerator(spark, name="test_data_set1", rows=row_count,
partitions=4, randomSeedMethod='hash_fieldname',
verbose=True)
.withColumn("purchase_id", IntegerType(), minValue=1000000, maxValue=2000000)
.withColumn("product_code", IntegerType(), uniqueValues=10000, random=True)
.withColumn("purchase_date", "date",
data_range=dg.DateRange("2017-10-01 00:00:00",
"2018-10-06 11:55:00",
"days=3"),
random=True)
.withColumn("return_delay", IntegerType(), values=[-1, -2, -3], weights=[9, 2, 1],
random=True, omit=True)
.withColumn("return_date", "date", expr="date_add(purchase_date, return_delay)")
)
dfTestData = testDataSpec.build()
display(dfTestData)
Error 2:
AnalysisException: cannot resolve '`purchase_date`' given input columns: [id]; line 1 pos 9;
The following works but not sure it is the correct way to do this:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import expr
import dbldatagen as dg
row_count = 1000 * 100
testDataSpec = (dg.DataGenerator(spark, name="test_data_set1", rows=row_count,
partitions=4, randomSeedMethod='hash_fieldname',
verbose=True)
.withColumn("purchase_id", IntegerType(), minValue=1000000, maxValue=2000000)
.withColumn("product_code", IntegerType(), uniqueValues=10000, random=True)
.withColumn("purchase_date", "date",
data_range=dg.DateRange("2017-10-01 00:00:00",
"2018-10-06 11:55:00",
"days=3"),
random=True)
.withColumn("return_delay", IntegerType(), values=[-1, -2, -3], weights=[9, 2, 1],
random=True)
)
dfTestData = testDataSpec.build()
dfTestData = dfTestData.withColumn("return_date", expr("date_add(purchase_date, return_delay)"))
display(dfTestData)