edwardcapriolo / filecrush Goto Github PK
View Code? Open in Web Editor NEWRemedy small files by combining them into larger ones.
Remedy small files by combining them into larger ones.
Hadoop filecrusher. Turn many small files into fewer larger ones. Also change from text to sequence and other compression options in one pass. Crush NAME Crush - Crush small files in dfs to fewer, larger files SYNOPSIS Crush [OPTION]... <input dir> <output dir> <timestamp> DESCRIPTION Crush consumes directories containing many small files with the same key and value types and creates fewer, larger files containing the same data. Crush is gives you the control to: * Name the output files * Ignore files that are "big enough" * Limit the size of each output file * Control the output compression codec * Swap smaller files with generated large files in-place * No long-running task problem See the EXAMPLES section ARGUMENTS input dir The root of the directory tree to crush. Directories are found recursively. output dir In non-clone mode, the directory where the output files should be written. In clone mode, the directory where the original files (that were combine into larger files) should be moved. timestamp A 14 digit job timestamp used to uniquely name files. E.g. 20100221175612. Generate in a script with: date +%Y%m%d%H%M%S GLOBAL OPTIONS -?, --help Print this help message. --threshold Percent threshold relative to the dfs block size over which a file becomes eligible for crushing. Must be in the (0, 1]. Default is 0.75, which means files smaller than or equal to 75% of a dfs block will be eligible for crushing. File greater than 75% of a dfs block will be left untouched. --max-file-blocks The maximum number of dfs blocks per output file. Must be a positive integer. Small input files are associated with an output file under the assumption that input and output compression codecs have similar efficiency. Also, a directory containing a lot of data in many small files will be converted into a directory containing a fewer number of large files rather than one super-massive file. With the default value 8, 80 small files, each being 1/10th of a dfs block will be grouped into to a single output file since 8 * 1/10 = 8 dfs blocks. If there are 81 small files, each being 1/10th of a dfs block, two output files will be created. One output file contain the combined contents of 41 files and the second will contain the combined contents of the other 40. A directory of many small files will be converted into fewer number of larger files where each output file is roughly the same size. --compress Fully qualified class name of the compression codec to use when writing data. It is permissible to use "none" and "gzip" to indicate no compression and org.apache.hadoop.io.compress.GzipCodec, respectively. --clone Use clone mode. Useful for external Hive tables. In clone mode, the small files are replaced with the larger files. The small files are moved to a subdirectory of the output dir argument. The subdirectory is same as the original directory rooted at output dir. For example, assume the input dir argument and output dir argument are /user/example/input and /user/example/output, respectively. If a file was originally /user/example/input/my-dir/smallfile, then after the clone, the original file would be located in /user/example/output/user/example/input/my-dir/smallfile. --info Print information to the console about what the crush is doing. --verbose Print even more information to the console about what the crush is doing. DIRECTORY OPTIONS If specified, these options must be appear as a group. When specifying multiple groups of these options, order matters. Defaults for directory options are not used if any are specified. See the EXAMPLES section. --regex Regular expression that matches a directory name. Defaults to .+ if no directory options are specified at all. Empty directories are not required to have a matching regex. Conceptually similar to the first argument of String.replaceAll(). --replacement Replacement string used with corresponding regex to name output files. Defaults to crushed_file-${crush.timestamp}-${crush.task.num}-${crush.file.num} if no directory options are specified at all. The placeholder ${crush.timestamp} refers to the command line argument. ${crush.task.num} refers to the reducer number. ${crush.file.num} is a zero-based count of files producer by a specific reducer. The first file written by a reducer will have ${crush.file.num} = 0, the second = 1, the third = 2, etc. Conceptually similar to the second argument of String.replaceAll(). --input-format Fully qualified class name of the input format for the data in a directory. Can use the "text" and "sequence" shortcuts for org.apache.hadoop.mapred.TextInputFormat and org.apache.hadoop.mapred.SequenceFileInputFormat, respectively. Defaults to sequence if no directory options are specified. --output-format Fully qualified class name of the output format to use when writing the output file for a directory. Can use the "text" and "sequence" shortcuts for org.apache.hadoop.mapred.TextOutputFormat and org.apache.hadoop.mapred.SequenceFileOutputFormat, respectively. Defaults to sequence if no directory options are specified. EXAMPLES Say we have the following files: /user/example/work/input/ small-file1 small-file2 small-file3 small-file4 big-enough-file subdir/ small-file6 small-file7 small-file8 medium-file1 medium-file2 And we invoke the crush like this: Crush /user/example/work/input /user/example/work/output 20100221175612 Since we have not specified any of the directory options, the default regex, replacement, input-format, and output-format are used. We will get: /user/example/work/ input/ small-file1 small-file2 small-file3 small-file4 subdir/ small-file6 small-file7 small-file8 medium-file1 medium-file2 output/ crushed_file-20100221175612-0-0 big-enough-file subdir/ crushed_file-20100221175612-1-0 crushed_file-20100221175612-1-1 Where: crushed_file-20100221175612-0-0 = small-file1 + small-file2 + small-file3 + small-file4 crushed_file-20100221175612-1-0 = medium-file1 + small-file6 + small-file8 crushed_file-20100221175612-1-1 = medium-file2 + small-file7 Notice how big-enough-file was moved to the output directory. The input directory contains only the files that were combined into the larger files. By default, the output file names end with two numbers. The first number is the task number of the reducer that wrote the file. The second number is the zero-based file count of that specific reducer. So a file ending with 0-0 was produced by reducer 0 and was the first file written by that reducer. A file ending 0-1 is the second file written by that reducer. A file ending 1-0 was produced by reducer 1 and was the first file written by that reducer. In the example, notice how the directory subdir was converted into two files. If mapred.reduce.tasks permits, multiple reducers can cooperate to crush a large directory. Now a clone example. Say we invoked the crush like this: Crush --clone /user/example/work/input /user/example/clone 20100221175612 With the clone option. We would end up with: /user/example/ work/input/ crushed_file-20100221175612-0-0 big-enough-file subdir/ crushed_file-20100221175612-1-0 crushed_file-20100221175612-1-1 clone/user/example/input/ small-file1 small-file2 small-file3 small-file4 subdir/ small-file6 small-file7 small-file8 medium-file1 medium-file2 Note how the original directory structure of /user/example/input as it appeared before the crush is reproduced in /user/example/clone. The small files that were combined are moved to the clone directory while the output files and file that were "big enough" are now in the inpu directory. Clone mode is useful for crushing external Hive tables. Just make sure that there are no Hive queries running on the table because they will fail when the small files are moved to the clone directory. Now we try an example using the directory options. Say we invoke the crush like this to control the output file names: Crush \ --regex=.*/(.+) \ --replacement=$1-${crush.timestamp}-${crush.task.num}-${crush.file.num} \ --input=sequence \ --output=sequence \ /user/example/work/input /user/example/work/output 20100221175612 The --regex and --replacement arguments are similar to the arguments passed to String.replaceAll(). The regex argument matches the final part of a directory path. For /user/example/work/input, it will match input. For /user/example/work/input/subdir, it will match subdir. For matching purposes, a directory path does not have a trailing slash. The replacement argument refers to the match group by number to rename the file. The result is: /user/example/work/output/ input-20100221175612-0-0 big-enough-file subdir/ subdir-20100221175612-1-0 subdir-20100221175612-1-1 The regex and replacement options are useful for naming the output files when crushing external Hive tables that are partitioned into directories whose names have business significance. The following invocation fails: Crush \ --regex=.*/input \ --replacement=input-${crush.timestamp}-${crush.task.num}-${crush.file.num} \ --input=sequence \ --output=sequence \ /user/example/work/input /user/example/work/output 20100221175612 Since we have specified some directory options, we must ensure that all directories in hierarchy rooted at the input argument have a matching regex (since the default regex is no longer applicable). In this invocation, there is no regex argument that matches /user/example/work/input/subdir. We must change it to: Crush \ --regex=.*/input \ --replacement=input-${crush.timestamp}-${crush.task.num}-${crush.file.num} \ --input=sequence \ --output=sequence \ --regex=.*/subdir \ --replacement=as-text-${crush.timestamp}-${crush.task.num}-${crush.file.num} \ --input=sequence \ --output=text \ /user/example/work/input /user/example/work/output 20100221175612 This will yield: /user/example/work/output/ input-20100221175612-0-0 big-enough-file subdir/ as-text-20100221175612-1-0 as-text-20100221175612-1-1 Notice subdir has two files whose names differ only by the ${crush.file.num} value. Without the ${crush.file.num}, file names are not guaranteed to be unique. NOTES This program creates a temporary directories in "tmp" of the executing user's home directory in dfs. https://zenodo.org/badge/doi/10.5281/zenodo.11038.png
Does it support merging avro files? When I provide the following command, I get an error saying that FileInputFormat is invalid..
Command:
hadoop jar filecrush-2.2.2-SNAPSHOT.jar com.m6d.filecrush.crush.Crush -Ddfs.block.size=134217728
--input-format="org.apache.avro.mapred.AvroInputFormat"
--output-format="org.apache.avro.mapred.AvroInputFormat"
/data/dir /data/dir-merge 20100222177812
Error:
Not a FileInputFormat:org.apache.avro.mapred.AvroInputFormat
at com.m6d.filecrush.crush.Crush.createJobConfAndParseArgs(Crush.java:531)
I managed to run filecrush for the first time and after everything seemed to finish successfully I got this error. In fact although it reported loads of files to crush it did not crush any...
Exception in thread "main" java.io.IOException: not a gzip file
at org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor.processBasicHeader(BuiltInGzipDecompressor.java:496)
at org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor.executeHeaderState(BuiltInGzipDecompressor.java:257)
at org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor.decompress(BuiltInGzipDecompressor.java:186)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:91)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:72)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2281)
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2304)
at com.m6d.filecrush.crush.Crush.cloneOutput(Crush.java:769)
at com.m6d.filecrush.crush.Crush.run(Crush.java:666)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.m6d.filecrush.crush.Crush.main(Crush.java:1330)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
My command line
hadoop jar ./filecrush-2.2.2-SNAPSHOT.jar com.m6d.filecrush.crush.Crush --info --clone --verbose --compress gzip --input-format text --output-format text /user/camus/tests/topics/ /user/camus/tests/topics_orig/ 20101121121212
Why does it say "SequenceFile"? I have gzipped json (ie text). Soon to be snappy json
Not clear under what license this code is available.
It looks like there is no way to set the number of reduces with V2 style arguments.
It seems this was supported with V1 style arguments.
int maxTasks = Integer.parseInt(args[2]);
job.setInt("mapred.reduce.tasks", maxTasks);
Is there any specific reason that it is not available in V2 or am I missing something?
Hi,
I'm sorry if this is a dumb question, but I can't figure out how to run the file crusher on my Hadoop cluster - I keep getting a class not found error. This is the command I'm running:
hadoop jar filecrush-2.2.2-SNAPSHOT.jar Crush /user/zslf023/pdb/all /user/zslf023/pdb/tenkcrushed 201424071559
which then returns:
Exception in thread "main" java.lang.ClassNotFoundException: Crush
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
Could you please let me know where I'm going wrong?
Thanks in advance!
Ana
~/git/filecrush[1131] $ hadoop jar target/filecrush-2.2.2-SNAPSHOT.jar com.m6d.filecrush.crush.Crush /user/arustamov/4 /user/arustamov/crush 20160701073112 16/07/01 07:31:50 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress 16/07/01 07:31:50 INFO Configuration.deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type 16/07/01 07:31:50 INFO Configuration.deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec 16/07/01 07:31:52 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 16/07/01 07:31:52 INFO compress.CodecPool: Got brand-new compressor [.deflate] 16/07/01 07:31:52 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces Exception in thread "main" java.io.FileNotFoundException: File hdfs://nameservice1/user/arustamov/tmp/crush-ee981b79-cb49-4a0b-9d61-f876c349dfd5/out does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:705) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:763) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:759) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:759) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537) at com.m6d.filecrush.crush.Crush.getOutputMappings(Crush.java:795) at com.m6d.filecrush.crush.Crush.moveOutput(Crush.java:814) at com.m6d.filecrush.crush.Crush.run(Crush.java:668) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84) at com.m6d.filecrush.crush.Crush.main(Crush.java:1330) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Hi Folks,
This is not a bug as such - just that I am not sure the capabilities of the --regex and --replacement features.
What I want is ideally to convert directories "one directory per hour" eg
...somedirectory/2015/05/10/21/...lots of files...
...somedirectory/2015/05/10/22/...lots of files...
...somedirectory/2015/05/10/23/...lots of files...
...somedirectory/2015/05/11/00/...lots of files...
...somedirectory/2015/05/11/01/...lots of files...
...somedirectory/2015/05/11/02/...lots of files...
into "one directory per day"
...somedirectory/2015/05/10/oneBigFile
...somedirectory/2015/05/11/oneBigFile
or, if necessary
...somedirectory/2015/05/10/00/oneBigFile
...somedirectory/2015/05/11/00/oneBigFile
(And ideally I'd love it to tell Hive HCatalog at the same time, but that might be asking too much)
I am trying to use the --regex and --replacement features to do this. Should it work?
This just adds in a new directory
--regex=".*/\d\d/(.+)"
--replacement=00/$1-${crush.timestamp}-${crush.task.num}-${crush.file.num} \
Should I be trying something like
--regex=".*/(\d\d)/(.+)"
--replacement=00/$2-${crush.timestamp}-${crush.task.num}-${crush.file.num} \
I suppose my fall back solution would be to move everything from the low level directories one directory up before running the file crush. That would be a bit of a pain - I suppose I could write a perl or shell script to do that which ran "hadoop fs -mv " commands
Alex
LZO seems to almost work; I'm not sure if it's known to or if there is a slight bug. I hope I'm missing something everyone else can see
When I run this command:
hadoop jar filecrush-2.2.2-SNAPSHOT.jar com.m6d.filecrush.crush.Crush
--compress com.hadoop.compression.lzo.LzopCodec
--input-format text
/user/hive/warehouse/test/actionlog/
/user/hive/warehouse/temp/test/actionlog/
20101121121212
It completes the map and reduce tasks then throws an exception at the very end. The part I'm curious about is that it seems to be trying to be expecting sequence file, when really it's a text file. I see the same results if I specify the --output-format to be either sequence or text:
Exception in thread "main" java.io.EOFException: Premature EOF from inputStream
at com.hadoop.compression.lzo.LzopInputStream.readFully(LzopInputStream.java:75)
at com.hadoop.compression.lzo.LzopInputStream.readHeader(LzopInputStream.java:114)
at com.hadoop.compression.lzo.LzopInputStream.(LzopInputStream.java:54)
at com.hadoop.compression.lzo.LzopCodec.createInputStream(LzopCodec.java:83)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1916)
at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1759)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1773)
at com.m6d.filecrush.crush.Crush.moveOutput(Crush.java:824)
at com.m6d.filecrush.crush.Crush.run(Crush.java:668)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.m6d.filecrush.crush.Crush.main(Crush.java:1330)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Dear devs,
I wanted first to thank you for this piece of software, really great!
I have one request I would like to raise with you, if possible. Could you please set the code so files starting with "" are ignored? The use case is as follows:
I have a data source that is quite slow. I use Apache Flume to store that data into HDFS. Because the data velocity is small, I set up Flume to roll to a new file after 10mn. This results in creating a lot of small files which your crusher handles just perfectly.
Now the issue is that Flume's temp files (i.e. files that are not closed yet) start with "" and are appended a ".tmp". When I run the crusher, if the file is closed in the meantime, well... the file is not found. I would like also to avoid errors from Flume's side and thus avoid manipulating those files.
The request is thus to either have a new option to ignore files starting with "_" or just ignore them by default.
Thanks a lot!
Can you mention maven build command for this project, sorry i am very new for maven
`$ hadoop jar target/filecrush-2.2.2-SNAPSHOT.jar com.m6d.filecrush.crush.Crush --info --verbose --threshold=0.1 --compress=gzip /user/arustamov/crush{17,18} $(date +%Y%m%d%H%M%S)
outDir is:
tmp/crush-a7ea6dac-c48a-483f-a652-1be09b8cfaff/out
Using temporary directory tmp/crush-a7ea6dac-c48a-483f-a652-1be09b8cfaff16/07/01 09:56:51 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
16/07/01 09:56:51 INFO compress.CodecPool: Got brand-new compressor [.deflate]
16/07/01 09:56:51 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
/user/arustamov/crush17 has no crushable files
Skipped 2 files
/user/arustamov/crush17/part-00402
/user/arustamov/crush17/part-00397
/user/arustamov/crush17/subdir has no crushable files
Skipped 1 files
/user/arustamov/crush17/subdir/part-00401
Copying crush files to /user/arustamov/crush18
Moving skipped files to /user/arustamov/crush18
/user/arustamov/crush17/subdir/part-00401 => /user/arustamov/crush18/subdir/part-00401
/user/arustamov/crush17/part-00402 => /user/arustamov/crush18/part-00402
/user/arustamov/crush17/part-00397 => /user/arustamov/crush18/part-00397
Deleting temporary directory
`
Hi,
I am facing this issue while trying to crush the directory
hadoop jar /home/cloudera-scm/sunny/filecrush-master/target/filecrush-2.2.2-SNAPSHOT.jar com.m6d.filecrush.crush.Crush home/cloudera-scm/sunny/OCNs/ /home/cloudera-scm/sunny/testing/ 20200901010101
Exception in thread "main" java.lang.IllegalArgumentException: The value of property avro.output.codec must not be null
Did anyone run into this exception? Any solutions will be appreciated.
When we run crusher on hive directory, it works like charm for first time.But for the next run, It cant move the original files. So we end up having both crushed and original files in hive directory. The problem attributes to the way crusher moves files after being crushed.As we already have original files put in destination after first run and hive creates part files starting with 00000-0 again, We end up having same name files in source directory as we have them in the destination directory(Due to first run). thus due to name conflict , Move operation fails.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.