Giter Site home page Giter Site logo

rxjava-file's Introduction

rxjava-file


Maven Central

Status: released to Maven Central

Requires Java 7.

Observable utilities for files:

  • tail a file (either lines or byte[])
  • trigger tail updates using Java 7 and later NIO WatchService events
  • or trigger tail updates using any Observable
  • stream WatchEvents from a WatchService
  • backpressure support
  • tested on Linux and Windows 7 (not OSX, help appreciated!)

Release Notes

Maven site reports are here including javadoc.

For RxJava 2.x see rxjava2-file.

Getting started

Add this maven dependency to your pom.xml:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>rxjava-file</artifactId>
  <version>0.4.4</version>
</dependency>

How to build

git clone https://github.com/davidmoten/rxjava-file
cd rxjava-file
mvn clean install 

Examples

Tail a text file with NIO

Tail the lines of the text log file /var/log/server.log as an Observable<String>:

import com.github.davidmoten.rx.FileObservable;
import rx.Observable;
import java.io.File; 
 
Observable<String> items = 
     FileObservable.tailer()
                   .file("/var/log/server.log")
                   .startPosition(0)
                   .sampleTimeMs(500)
                   .chunkSize(8192)
                   .utf8()
                   .tailText();
                     

or, using defaults (will use default charset):

Observable<String> items = 
     FileObservable.tailer()
                   .file("/var/log/server.log")
                   .tailText();

Note that if you want the Observable<String> to be emitting line by line then wrap it with a call like StringObservable.split(observable, "\n"). StringObservable is in the RxJava rxjava-string artifact.

Tail a text file without NIO

The above example uses a WatchService to generate WatchEvents to prompt rereads of the end of the file to perform the tail.

To use polling instead (say every 5 seconds):

Observable<String> items = 
     FileObservable.tailer()
                   .file(new File("var/log/server.log"))
                   .source(Observable.interval(5, TimeUnit.SECONDS)
                   .tailText();

Tail a binary file with NIO

Observable<byte[]> items = 
     FileObservable.tailer()
                   .file("/tmp/dump.bin")
                   .tail();

Tail a binary file without NIO

Observable<byte[]> items = 
     FileObservable.tailer()
                   .file("/tmp/dump.bin")
                   .source(Observable.interval(5, TimeUnit.SECONDS)
                   .tail();

rxjava-file's People

Contributors

davidmoten avatar enricsala avatar matthiasbalke avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxjava-file's Issues

Issue with chunk size

I am working on a web based application to tail log file. I set a chunksize of 16384 and startPosition 5. My log file is around 38 MB in size and the browser hangs as soon as I do so. I am using Jetty and Primefaces.

When I click the server, the text area displaying log content gets filled up with way more data than expected.

int displayChunkSize =  16384 :
long fileSize = path.toFile().length();
long startPos =5;
Observable<String> tailer = FileObservable
           .tailer()
           .file( path.toFile().getAbsolutePath() ) 
           .sampleTimeMs( 500 )
           .chunkSize( displayChunkSize )
           .startPosition( startPos)
           .tailText();
SafeSubscriber safeSub = new SafeSubscriber(new ActionSubscriber( 
    onMessage, onError, Actions.empty() ) );
Runnable sub = () -> tailer.subscribe( safeSub );

add onWatchStart action

For testing at least need to know when the WatchService has started so that I can start writing to files and testing behaviour. Would prefer not to use time delays because that can make tests non-deterministic.

Existing lines are not emitted until the file is updated

The existing lines are emitted only when the file changes. This is not like tail(1) which dumps directly the current content.

Is there a way to achieve this or is there a missing flag?

Currently, I worked around with:

    FileObservable.tailer()
            .file(file)
            .tailText()
            .startWith(Observable.defer(() -> {
                try {
                    return Observable.from(Files.readAllLines(Paths.get(file)));
                } catch (IOException e) {
                    return Observable.error(e);
                }
            })

(Note: I defer in order to be sure that I do not miss lines)

how to emit full path from watched directory?

@SuperEvenSteven

public class DirectoryEvent {
    public final File dir;
    public final WatchEvent event;
    public DirectoryEvent(File dir, WatchEvent event) {
        this.dir = dir;
        this.event = event;
    }
}

public static Observable<DirectoryEvent> events(File dir) {
    return  FileObservable.from(dir).map(event -> new DirectoryEvent(dir, event));
}

Interaction with standard operators

I have the following setup:

    File file = new File(".\\server.log");
    Observable<String> newLines =
            FileObservable.tailer()
                    .file(file)
                    .startPosition(file.length())
                    .sampleTimeMs(1000)
                    .chunkSize(8192)
                    .utf8()
                    .tailText();

    newLines.subscribe(System.out::println);

and it works as expected.
But as soon as I try to chain some more operators, I get problems. For instance, changing to

  newLines.filter(LogfileWatcher::error).subscribe(System.out::println);

(where error() is a simple function String -> Boolean) I get output only after the first append to the file, but not the subsequent ones.
Similar problems appear when using window() or several other operators. For instance, using the byLine() function as suggested in the docs results in no output at all:

    Observable<String> newLines1 = StringObservable.split(newLines, "\n");
    newLines1.subscribe(System.out::println);

I'm very new to RX, so I don't know if I'm doing something wrong, though...

Android compatibility

Not all java.nio classes are available on Android:
java.lang.NoClassDefFoundError: Failed resolution of: [Ljava/nio/file/WatchEvent$Kind;

Would it be possible to make this platform independent?

OperatorWatchServiceEvents.java:84 blocks indefinately

Hi, all (but one) test fails for me. I've debugged the code, and it hangs on the watchService.Take() call. There's never an event, on my system, I guess—which I understand to be platform dependent. FYI, I'm running OSX Yosemite.

Watcher lives on after all subscribers have unsubscribed

After

                    Observable<String> items =  FileObservable.tailer()
                   .file("/test.log")
                   .startPosition(0)
                   .sampleTimeMs(500)
                   .chunkSize(8192)
                   .utf8()
                   .tailText();
                   Subscription subItem = items.subscribeOn(Schedulers.newThread()).subscribe(s->{
                        System.out.println(s);
                   });
                   subItem.unsubscribe();

A thread named RxIoScheduler-1 (Evictor) seems to live on as long as the process is open instead of dying off because there are no subscriptions.

Am I missing something obvious?

Support for latest RxJava 2.x

It's been almost 2 years since v2.x release of RxJava and its also widely adopted than 1.x

It would be great if this library supports latest version of RxJava

StringObservable required?

Hi,

first: thanks for your great work!

In the documentation you note:

[...] if you want the Observable to be emitting line by line then wrap it with a call like StringObservable.split(observable, "\n"). [...]

Is this still required? I saw in the source code that tailText calls a function "toLines". Does this call do something different? Why is StringObservable required?

Update for backpressure

Code needs review and update to handle introduction of backpressure in RxJava that happened ages ago

FileObservable.tailFile should detect Delete events

If a file was removed or deleted (say with a log file rollover) and instantly created and written to so that the length was greater than the original length then we want all the new lines to be streamed. To do this OperatorFileTailer needs to detect file deletion events.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.