Giter Site home page Giter Site logo

cc-pyspark's Introduction

Common Crawl Logo

Common Crawl PySpark Examples

This project provides examples how to process the Common Crawl dataset with Apache Spark and Python:

Further information about the examples and available options is shown via the command-line option --help.

Implementing a Custom Extractor

Extending the CCSparkJob isn't difficult and for many use cases it is sufficient to override a single method (process_record). Have a look at one of the examples, e.g. to count HTML tags.

Setup

To develop and test locally, you will need to install

pip install -r requirements.txt
  • (optionally, and only if you want to query the columnar index) install S3 support libraries so that Spark can load the columnar index from S3

Compatibility and Requirements

Tested with with Spark 3.2.3, 3.3.2 and 3.4.1 in combination with Python 3.8, 3.9 and 3.10. See the branch python-2.7 if you want to run the job on Python 2.7 and older Spark versions.

Get Sample Data

To develop locally, you'll need at least three data files – one for each format used in at least one of the examples. They can be fetched from the following links:

Alternatively, running get-data.sh downloads the sample data. It also writes input files containing

  • sample input as file:// URLs
  • all input of one monthly crawl as paths relative to the data bucket base URL s3://commoncrawl/ resp. https://data.commoncrawl.org/ – see authenticated S3 access or access via HTTP for more information.

Note that the sample data is from an older crawl (CC-MAIN-2017-13 run in March 2017). If you want to use more recent data, please visit the Common Crawl site.

Process Common Crawl Data on Spark

Running locally

First, point the environment variable SPARK_HOME to your Spark installation. Then submit a job via

$SPARK_HOME/bin/spark-submit ./server_count.py \
	--num_output_partitions 1 --log_level WARN \
	./input/test_warc.txt servernames

This will count web server names sent in HTTP response headers for the sample WARC input and store the resulting counts in the SparkSQL table "servernames" in your warehouse location defined by spark.sql.warehouse.dir (usually in your working directory as ./spark-warehouse/servernames).

The output table can be accessed via SparkSQL, e.g.,

$SPARK_HOME/bin/pyspark
>>> df = sqlContext.read.parquet("spark-warehouse/servernames")
>>> for row in df.sort(df.val.desc()).take(10): print(row)
... 
Row(key=u'Apache', val=9396)
Row(key=u'nginx', val=4339)
Row(key=u'Microsoft-IIS/7.5', val=3635)
Row(key=u'(no server in HTTP header)', val=3188)
Row(key=u'cloudflare-nginx', val=2743)
Row(key=u'Microsoft-IIS/8.5', val=1459)
Row(key=u'Microsoft-IIS/6.0', val=1324)
Row(key=u'GSE', val=886)
Row(key=u'Apache/2.2.15 (CentOS)', val=827)
Row(key=u'Apache-Coyote/1.1', val=790)

But it's also possible to configure a different output format, for example CSV or JSON, see the command-line options.

See also

Running in Spark cluster over large amounts of data

As the Common Crawl dataset lives in the Amazon Public Datasets program, you can access and process it on Amazon AWS (in the us-east-1 AWS region) without incurring any transfer costs. The only cost that you incur is the cost of the machines running your Spark cluster.

  1. spinning up the Spark cluster: AWS EMR contains a ready-to-use Spark installation but you'll find multiple descriptions on the web how to deploy Spark on a cheap cluster of AWS spot instances. See also launching Spark on a cluster.

  2. choose appropriate cluster-specific settings when submitting jobs and also check for relevant command-line options (e.g., --num_input_partitions or --num_output_partitions, see below)

  3. don't forget to deploy all dependencies in the cluster, see advanced dependency management

  4. also the the file sparkcc.py needs to be deployed or added as argument --py-files sparkcc.py to spark-submit. Note: some of the examples require further Python files as dependencies.

Command-line options

All examples show the available command-line options if called with the parameter --help or -h, e.g.

$SPARK_HOME/bin/spark-submit ./server_count.py --help

Overwriting Spark configuration properties

There are many Spark configuration properties which allow to tune the job execution or output, see for example see tuning Spark or EMR Spark memory tuning.

It's possible to overwrite Spark properties when submitting the job:

$SPARK_HOME/bin/spark-submit \
    --conf spark.sql.warehouse.dir=myWareHouseDir \
    ... (other Spark options, flags, config properties) \
    ./server_count.py \
    ... (program-specific options)

Authenticated S3 Access or Access Via HTTP

Since April 2022 there are two ways to access of Common Crawl data:

  • using HTTP/HTTPS and the base URL https://data.commoncrawl.org/ or https://ds5q9oxwqwsfj.cloudfront.net/
  • using the S3 API to read the bucket s3://commoncrawl/ requires authentication and makes an Amazon Web Services account mandatory.

The S3 API is strongly recommended as the most performant access scheme, if the data is processed in the AWS cloud and in the AWS us-east-1 region. In contrary, if reading the data from outside the AWS cloud, HTTP/HTTPS access is the preferred option.

Dependent on the chosen access scheme, the data bucket's base URL needs to be passed using the command-line option --input_base_url:

  • --input_base_url https://data.commoncrawl.org/ when using HTTP/HTTPS
  • --input_base_url s3://commoncrawl/ when using the S3 API.

This project uses boto3 to access WARC, WAT or WET files on s3://commoncrawl/ over the S3 API. The best way is to ensure that a S3 read-only IAM policy is attached to the the IAM role of the EC2 instances where Common Crawl data is processed, see the IAM user guide. If this is no option (or if the processing is not running on AWS), there are various other options to configure credentials in boto3.

Please also note that querying the columnar index requires S3 access.

Querying the columnar index

The example tools to query the columnar URL index may require additional configuration and setup steps.

Supported access schemes for the columnar index

Querying the columnar index using cc-pyspark requires authenticated S3 access. There is no support for HTTP/HTTPS access. Please see here for more information about supported data access schemes.

Installation of S3 Support Libraries

While WARC/WAT/WET files are read using boto3, accessing the columnar URL index (see option --query of CCIndexSparkJob) is done directly by the SparkSQL engine and requires that S3 support libraries are available. These libs are usually provided when the Spark job is run on a Hadoop cluster running on AWS (eg. EMR). However, they may not be provided for any Spark distribution and are usually absent when running Spark locally (not in a Hadoop cluster). In these situations, the easiest way is to add the libs as required packages by adding --packages org.apache.hadoop:hadoop-aws:3.2.1 to the arguments of spark-submit. This will make Spark manage the dependencies - the hadoop-aws package and transitive dependencies are downloaded as Maven dependencies. Note that the required version of hadoop-aws package depends on the Hadoop version bundled with your Spark installation, e.g., Spark 3.2.1 bundled with Hadoop 3.2 (spark-3.2.1-bin-hadoop3.2.tgz).

Please also note that:

Below an example call to count words in 10 WARC records host under the .is top-level domain using the --packages option:

$SPARK_HOME/bin/spark-submit \
    --packages org.apache.hadoop:hadoop-aws:3.3.2 \
    ./cc_index_word_count.py \
    --input_base_url s3://commoncrawl/ \
    --query "SELECT url, warc_filename, warc_record_offset, warc_record_length, content_charset FROM ccindex WHERE crawl = 'CC-MAIN-2020-24' AND subset = 'warc' AND url_host_tld = 'is' LIMIT 10" \
    s3a://commoncrawl/cc-index/table/cc-main/warc/ \
    myccindexwordcountoutput \
    --num_output_partitions 1 \
    --output_format json

Columnar index and schema merging

The schema of the columnar URL index has been extended over time by adding new columns. If you want to query one of the new columns (e.g., content_languages), the following Spark configuration option needs to be set:

--conf spark.sql.parquet.mergeSchema=true

However, this option impacts the query performance, so use with care! Please also read cc-index-table about configuration options to improve the performance of Spark SQL queries.

Alternatively, it's possible configure the table schema explicitly:

Using FastWARC to parse WARC files

FastWARC is a high-performance WARC parsing library for Python written in C++/Cython. The API is inspired in large parts by WARCIO, but does not aim at being a drop-in replacement.

Replacing FastWARC can speed up job execution by 25% if little custom computations are done and most of the time is spent for parsing WARC files.

To use FastWARC

Some differences between the warcio and FastWARC APIs are hidden from the user in methods implemented in CCSparkJob and CCFastWarcSparkJob respectively. These methods allow to access WARC or HTTP headers and the payload stream in a unique way, regardless of whether warcio or FastWARC are used.

However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see Resiliparse HTTP Tools). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also WARC 1.1 specification, http/https response records.

Credits

Examples are originally ported from Stephen Merity's cc-mrjob with the following changes and upgrades:

  • based on Apache Spark (instead of mrjob)
  • boto3 supporting multi-part download of data from S3
  • warcio a Python 2 and Python 3 compatible module for accessing WARC files

Further inspirations are taken from

License

MIT License, as per LICENSE

cc-pyspark's People

Contributors

cronoik avatar jaehunro avatar praveenr019 avatar sebastian-nagel avatar xue-alex avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cc-pyspark's Issues

Processing English only archives

First off, thanks for the great work on this data set.

I'd like to process archives that are English only. I saw that the columnar url index contained content_languages information (https://commoncrawl.s3.amazonaws.com/cc-index/table/cc-main/index.html). However, it seems that the index is for WARC files (though I understand that WET files are derived from WARC files). Thus, my initial plan was the following:

  1. Use AWS Athena to select rows for a single month partition where content_languages = "eng" and save this as a separate table in my own s3 bucket.
CREATE TABLE "ccindex-2020-10-eng"
WITH (
  format = 'Parquet',
  external_location = 's3://my-bucket/ccindex-2020-10-eng',
)
AS SELECT url, warc_filename, warc_record_offset, warc_record_length
FROM "ccindex"."ccindex"
WHERE crawl = 'CC-MAIN-2020-10'
  AND subset = 'warc'
  AND content_languages = 'eng';
  1. Use cc_index_url_match.py to process subset of WARC archives that are English only and use a regular expression to match for URLs in the document content that are camel case (e.g. www.ThisIsATest.com). The code is pretty much the same as cc-pyspark/cc_index_word_count.py with the exception of a few minor tweaks:
  • Remove the .persist() on the input index (https://github.com/commoncrawl/cc-pyspark/blob/master/sparkcc.py#L329) since the index this time around is fairly large (>988 million rows)
  • Change output_schema to just be a single key (the matched string found in document)
  • Call .distinct() on RDD following .mapPartitions to remove duplicates
  • Change regular expression
  1. Run the Spark job (locally or on AWS EMR)
    Local version (for debugging)
export SPARK_HOME=/some/path/to/spark-2.4.4-bin-hadoop2.7

$SPARK_HOME/bin/spark-submit \
  --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 \
  ./cc_index_word_count.py \
  --query "SELECT url, warc_filename, warc_record_offset, warc_record_length FROM ccindex" \
  s3a://my-bucket/ccindex-2020-10-eng/ \
  output_table_name

I've also tried to set up the job to run on AWS EMR. The configuration I use on EMR is:
Hardware:
1 master = m5.xlarge (4 vCore, 16 GiB memory)
5 core = c5.4xlarge (16 vCore, 32 GiB memory)

Spark step:

spark-submit \
  --deploy-mode cluster \
  --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 \
  --py-files s3://my-bucket/sparkcc.py \
  s3://my-bucket/cc_index_url_match.py \
  --output_option path=s3://my-output-bucket/cc-main-2020-10-eng \
  --output_compression snappy --num_output_partitions 100 \
  --query "SELECT url, warc_filename, warc_record_offset, warc_record_length FROM ccindex"\
  s3a://my-bucket/ccindex-2020-10-eng/ \
  output_table_name

I use --output_option path=s3://my-output-bucket/cc-main-2020-10-eng to save the output table to my designated s3 bucket. However, this EMR job had numerous issues. The job has been running for >16 hours now, and it seems that the task metrics aren't being reported correctly since all tasks for the saveAsTable are marked as pending and no executors are shown to have any active tasks. It seems like it's stuck with all pending stages and no active stages For more details, eventLogs-application_1590106415063_0001-1.zip.
As a sanity check, I used the exact same setup with a smaller version of the index with only 10 rows and it worked fine.

To summarize:

  • Is it just a bad idea to try to use an input index with this many rows?
  • Is it possible to just use WET files somehow?
    (e.g. process WET files without worrying about content_languages and just produce Row(match_url="www.ThisIsATest.com", target_uri="{record.rec_headers.get_header('WARC-Target-URI')}") then JOIN this later with the index table on target_uri = ccindex-2020-10-eng.url)
  • What is an efficient way to achieve the end result of processing English only content (w/o destroying my wallet)?

Thanks for your time.

Host link extraction does not represent every IDN as IDNA

The host link extractor (ExtractHostLinksJob in wat_extract_links.py) should represent every hostname which contains non-ASCII Unicode characters (internationalized domain names - IDN) as IDNA (the Puncode ASCII representation of the hostname). The check whether a hostname requires conversion to its IDNA equivalent is not complete. Examples of such hostnames from cc-main-2021-22-oct-nov-jan-host :

app.ıo
br.org.poſtgres
br.org.poſtgresql
top.oynayamıyor

These hostnames include very specific non-ASCII characters for which specific case-folding rules apply:

'poſtgres'.upper()  # 'POSTGRES'
'ıo'.upper().lower() # 'io'

Looks like ccspark tried to access everything from local file. What's wrong with the settings?

spark-3.3.2-bin-hadoop3/bin/spark-submit ./server_count. --num_output_partitions 1 --log_level WARN ./input/wat.gz servernames

23/02/18 09:20:39 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.41, 56238, None)
2023-02-18 09:20:52,155 INFO CountServers: Reading local file WARC/1.0
2023-02-18 09:20:52,156 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC/1.0: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC/1.0'
2023-02-18 09:20:52,157 INFO CountServers: Reading local file WARC-Type: warcinfo
2023-02-18 09:20:52,158 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC-Type: warcinfo: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC-Type: warcinfo'
2023-02-18 09:20:52,158 INFO CountServers: Reading local file WARC-Date: 2017-04-01T22:37:17Z
2023-02-18 09:20:52,159 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC-Date: 2017-04-01T22:37:17Z: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC-Date: 2017-04-01T22:37:17Z'
2023-02-18 09:20:52,160 INFO CountServers: Reading local file WARC-Filename: CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz
2023-02-18 09:20:52,161 ERROR CountServers: Failed to open /Users/joe/cc-pyspark/WARC-Filename: CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz: [Errno 2] No such file or directory: '/Users/joe/cc-pyspark/WARC-Filename: CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz'
2023-02-18 09:20:52,163 INFO CountServers: Reading local file WARC-Record-ID: urn:uuid:55d1a532-f91b-4461-b803-9bfc77efa410

How To: process CC NEWS warc files, most recent first

This is more a 'how would I do...' question than an issue report.

I am processing the commoncrawl news WARC files with multiple python processes on a Kubernetes cluster. I am in progress of implementing cc-pyspark instead, to improve the parallelism.

Now, my main issue is that the cluster should process the most recent WARC file first, continuously released every hour (?), and only process older files when there is no new file pending.
Both my own setup and - as far as I know - cc-pyspark will only work on a specific set of WARC files. For instance, I can store a list of all 14183 news warc files in a DataFrame to be dispatched to all executors, but I can not dynamically add the new WARC new files first in that DataFrame.

One idea was to use Spark Structured Streaming with two queues, where the list of existing files are added in one queueStream and the new incoming files are appended to the second queueStream. That second one then needs to be consumed first, so has higher priority.
Not sure if that's possible with standard py-spark, or Spark/Scala.

Is this something you (or any cc-pyspark user) has encountered? I'm looking for directions, but working code is also fine. :)
Thanks.

Can not run server_count example on Windows locally

I tried to call:

$SPARK_HOME/bin/spark-submit ./server_count.py \
	--num_output_partitions 1 --log_level WARN \
	./input/test_warc.txt servernames

But getting error:

py4j.protocol.Py4JJavaError: An error occurred while calling o55.saveAsTable

I installed Hadoop 3.0.0 from here https://github.com/steveloughran/winutils.

I am calling under Windows 7, Python 3.6.6 64 bit, Java 8.

Full log is:

21/03/16 14:49:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/03/16 14:49:08 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Traceback (most recent call last):
File "server_count.py", line 46, in
job.run()
File "C:\Users\FA.PROJECTOR-MSK\Google Диск\Colab Notebooks\Finance\cc-pyspark\sparkcc.py", line 152, in run
self.run_job(sc, sqlc)
File "C:\Users\FA.PROJECTOR-MSK\Google Диск\Colab Notebooks\Finance\cc-pyspark\sparkcc.py", line 187, in run_job
.saveAsTable(self.args.output)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\pyspark\sql\readwriter.py", line 1158, in saveAsTable
self._jwrite.saveAsTable(name)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\py4j\java_gateway.py", line 1305, in call
answer, self.gateway_client, self.target_id, self.name)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\pyspark\sql\utils.py", line 111, in deco
return f(*a, **kw)
File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o56.saveAsTable.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.validateTableLocation(SessionCatalog.scala:356)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:170)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:753)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:731)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:626)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)

Incompatible Architecture

I am using a 2021 iMac with the Apple M1 chip and macOS Monterey 12.4.

So far to set up PySpark I have pip3 installed pyspark, plus cloned this repo and installed from the requirements.txt file, plus downloaded Java from their homepage. I'm using Python 3.8.9.

I added the path to the pip3 installation of pyspark to SPARK_HOME in my .zshrc and sourced it:

% echo $SPARK_HOME
/Users/julius/Library/Python/3.8/lib/python/site-packages/pyspark

I then executed the following command:

$SPARK_HOME/bin/spark-submit ./server_count.py \
	--num_output_partitions 1 --log_level WARN \
	./input/test_warc.txt servernames

I had to execute this from inside the cc-pyspark repo, otherwise the script could not find the program server_count.py.

It returns this error message:

julius@Juliuss-iMac cc-pyspark % $SPARK_HOME/bin/spark-submit ./server_count.py \
        --num_output_partitions 1 --log_level WARN \
        ./input/test_warc.txt servernames
Traceback (most recent call last):
  File "/Users/julius/cc-pyspark/server_count.py", line 1, in <module>
    import ujson as json
ImportError: dlopen(/Users/julius/Library/Python/3.8/lib/python/site-packages/ujson.cpython-38-darwin.so, 0x0002): tried: '/Users/julius/Library/Python/3.8/lib/python/site-packages/ujson.cpython-38-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64'))
22/07/06 15:04:13 INFO ShutdownHookManager: Shutdown hook called
22/07/06 15:04:13 INFO ShutdownHookManager: Deleting directory /private/var/folders/xv/yzpjb77s2qg14px8dc7g4m_80000gn/T/spark-80c476e9-b5ba-4710-b292-e367dd387ece

There's something wrong with my installation of "ujson", it is for arm, but PySpark is designed for x86? Is that correct?

What is the simplest way to fix this issue? Should I try to run PySpark in some kind of x86 emulation like Rosetta? Has PySpark not been designed for the M1 Chip?

Is there a chance this is the fault of my Java installation? I took the first one offered; it seemed to say x86, but when I tested running PySpark on its own, it seemed to work fine.

Thanks very much

Provide classes to use FastWARC to read WARC/WAT/WET files

FastWARC (see also FastWARC API docs) is a Python WARC parsing library

  • written in C++ for high performance
  • although inspired by warcio, not API compatible
  • without less-frequently used features, eg. reading ARC files or (as of now) chunked transfer encoding

Ideally, API differences between FastWARC and warcio should be hidden away in methods in CCSparkJob or a derived class, so that users do not have to care about the differences, except for very specific cases. Because of the differences and the required compilation of C++ components, usage of FastWARC should be optional.

Test and update examples to work with ARC files of the 2008 - 2012 crawls

warcio is able to read ARC files as well, so it should be possible to run all examples designed to work on WARC files also on ARC files from the 2008 - 2012 crawls.

  • needs to be tested whether the WARC examples can be run successfully on ARC files, minor modifications may be necessary
  • could extend some WAT/WET examples so that they can be used also on WARC/ARC (cf. server_count.py which may use WARC or WAT)

Common Crawl Index Table - Need for Schema Merging to be documented

Hi,

if some needs to load the full index table (parquet file) into pyspark with all latest fields, there is a need for setting the Spark property spark.sql.parquet.mergeSchema to "true" or use the following

df = spark.read.option("mergeSchema", "true").parquet('s3://commoncrawl/cc-index/table/cc-main/warc/')

Without this, fields that were added at a later stage like content_languagues are not loaded in the spark dataframe.

Maybe we could also provide the complete schema to Spark, so that there is
no need to extract the schema initially from (one of) the Parquet files.

Thanks

Webgraph construction: include nodes with zero outgoing links

Pages (or hosts) without outgoing links are lost during webgraph construction:

  • during the first step (wat_extract_links.py) the graph is represented as list of edges <url_from, url_to> resp. <host_from, host_to>. Pages/hosts without outgoing links need to be represented as self-loops in order not to lose them.
  • optionally the second step (hostlinks_to_graph.py) could remove the self-loops but keep the isolated vertices in the <vertex_id, label> mapping. Note: self-loops are often ignored or must even be eliminated when ranking nodes by centrality metrics.

Drop support for Python 2.7

Spark has dropped support for Python 2.7 (3.4 and 3.5), see SPARK-32138. The latest Spark versions supporting Python 2.7 were released summer 2021 (2.4.8 and 3.0.3). It's time for cc-pyspark to also drop support for older Python versions. That's also a requirement for the FastWARC parser #37.

Document dependency of CCIndexSparkJob to Java S3 file system libs

(reported by @calee88 in #12)

While CCSparkJob uses the boto3 to read WARC/WAT/WET files, CCIndexSparkJob requires that the Spark installation includes libs/jars to access data on S3 (s3://commoncrawl/). These are usually provided when a Spark is used in a Hadoop cluster (eg. EMR, Spark on Yarn) but may not for any Spark package esp. when running Spark locally (not in a Hadoop cluster). Also add information about

  • Hadoop S3 FilesSystem implementations requiring to adapt the schema part of the data URI (s3:// on EMR, s3a:// when using s3a)
  • accessing data anonymously (s3a.AnonymousAWSCredentialsProvider).

boto3 credentials error when running CCSparkJob with ~100 S3 warc paths as input, but works with <10 S3 warc paths as input

  • Created a spark job subclassing CCSparkJob to retrieve html text data. This job is working when passing input file with <10 S3 warc paths, but throwing below error when running with around 100 S3 warc paths. Could you please share your thoughts on what must be is causing this.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/__pyfiles__/sparkcc.py", line 355, in process_warcs
    stream = self.fetch_warc(uri, self.args.input_base_url)
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1654076989914_0005/container_1654076989914_0005_03_000005/__pyfiles__/sparkcc.py", line 290, in fetch_warc
    self.get_s3_client().download_fileobj(bucketname, path, warctemp)
  File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 795, in download_fileobj
    return future.result()
  File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 103, in result
    return self._coordinator.result()
  File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 266, in result
    raise self._exception
  File "/usr/local/lib/python3.7/site-packages/s3transfer/tasks.py", line 269, in _main
    self._submit(transfer_future=transfer_future, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/s3transfer/download.py", line 357, in _submit
    **transfer_future.meta.call_args.extra_args,
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 508, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 895, in _make_api_call
    operation_model, request_dict, request_context
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 917, in _make_request
    return self._endpoint.make_request(operation_model, request_dict)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 116, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 195, in _send_request
    request = self.create_request(request_dict, operation_model)
  File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 134, in create_request
    operation_name=operation_model.name,
  File "/usr/local/lib/python3.7/site-packages/botocore/hooks.py", line 412, in emit
    return self._emitter.emit(aliased_event_name, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/hooks.py", line 256, in emit
    return self._emit(event_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/hooks.py", line 239, in _emit
    response = handler(**kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/signers.py", line 103, in handler
    return self.sign(operation_name, request)
  File "/usr/local/lib/python3.7/site-packages/botocore/signers.py", line 187, in sign
    auth.add_auth(request)
  File "/usr/local/lib/python3.7/site-packages/botocore/auth.py", line 407, in add_auth
    raise NoCredentialsError()
botocore.exceptions.NoCredentialsError: Unable to locate credentials

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:954)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:287)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:230)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:133)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Commands to execute python files?

It would have been helpful, if there were some command examples for each .py files.
Or am I not finding those?
For now, I need to read every line of codes to understand the examples.
Still, I appreciate the examples, it would be much harder without the examples.

Bad Substitution

Alternatively, running get-data.sh downloads the sample data.

% ./get-data.sh
./get-data.sh: line 20: Downloading Common Crawl paths listings (${data_type^^} files of $CRAWL)...: bad substitution

Could anyone explain why my attempts at running this script result in this message?

Thank you

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.