Giter Site home page Giter Site logo

Comments (15)

jexp avatar jexp commented on May 27, 2024

Can you try this with:

MATCH (user:User) WHERE id(user) = {id}
MATCH (user)-[:BOUGHT]->(product)
WITH user,collect(product) as products
CALL apoc.cypher.parallelMap("MATCH (_)<-[:BOUGHT]-(other)-[:BOUGHT]->(reco:Product) USING SCAN reco:Product USING JOIN ON other RETURN reco,count(*) as freq",null,products) yield value
RETURN id(value.reco) as reco, sum(value.freq) as score
ORDER BY score DESC LIMIT 10;

from neo4j-apoc-procedures.

jexp avatar jexp commented on May 27, 2024

Ping @ikwattro ?

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

Didn't have time to test it yet with apoc + this current dataset. Will ping during the week

from neo4j-apoc-procedures.

jexp avatar jexp commented on May 27, 2024

Did you test it? There is now also apoc.periodic.iterate and apoc.periodic.commit

Besides apoc.cypher.*

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

@jexp Just tested with latest version of apoc and neo4j 3.0.3

MATCH (u:User) WHERE u.id = 463
MATCH (u)-[:LIKES]->(movie)
WITH u, collect(movie) as movies
CALL apoc.cypher.mapParallel("MATCH (_)<-[:LIKES]-(other)-[:LIKES]->(reco:Movie) USING SCAN reco:Movie USING JOIN ON other RETURN reco,count(*) as freq",{},movies) yield value
RETURN id(value.reco) as reco, sum(value.freq) as score
ORDER BY score DESC LIMIT 10;

CPU is burning and the query never returns. Lot of GC pauses in the logs.

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

Same query in plain cypher is taking 350ms :

MATCH (u:User) WHERE u.id = 463
MATCH (u)-[:LIKES]->(movie)<-[:LIKES]-(other)
WITH distinct other
MATCH (other)-[:LIKES]->(movie)
RETURN movie as reco, count(*) as score
ORDER BY score DESC LIMIT 10

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

@jexp I have a custom procedure just for this use case, it takes 88ms with it :

@Procedure
    public Stream<Recommendation> cf(
            @Name("input") Node input,
            @Name("relType") String relType,
            @Name("direction") String direction,
            @Name("maxItemDegree") Object maxItemDegree,
            @Name("maxCollabDegree") Object maxCollabDegree,
            @Name("maxRecoDegree") Object maxRecoDegree
            ) {
        RelationshipType relationshipType = RelationshipType.withName(relType);
        Direction firsDirection = resolveDirection(direction);
        Direction otherDirection = reverse(firsDirection);
        int itemDegree = null != maxItemDegree ? (int) maxItemDegree : DEFAULT_MAX_ITEM_DEGREE;
        int collabDegree = null != maxCollabDegree ? (int) maxCollabDegree : DEFAULT_MAX_COLLAB_DEGREE;
        int recoDegree = null != maxRecoDegree ? toIntExact((long) maxRecoDegree) : DEFAULT_MAX_RECO_DEGREE;
        ConcurrentHashMap<Long, ScoredItem> scoredItems = new ConcurrentHashMap<>();

        try {
            List<Worker> level2Workers = new ArrayList<>();
            List<Long> blacklist = new ArrayList<>();
            List<Long> collabs = new ArrayList<>();
            for (Relationship rel : input.getRelationships(relationshipType, firsDirection)) {
                Node item = rel.getOtherNode(input);
                blacklist.add(item.getId());
                if (checkDegree(item, relationshipType, Direction.BOTH, itemDegree*2)) {
                    for (Relationship rel2 : item.getRelationships(relationshipType, otherDirection)) {
                        Node collab = rel2.getOtherNode(item);
                        if (!collabs.contains(collab.getId()) && checkDegree(collab, relationshipType, Direction.BOTH, collabDegree*2)) {
                            collabs.add(collab.getId());
                            level2Workers.add(new Worker(api, collab, relationshipType, firsDirection, recoDegree, blacklist));
                        }
                    }
                }
            }

            level2Workers.parallelStream().forEach(new Consumer<Worker>() {
                @Override
                public void accept(Worker worker) {
                    worker.traverse().forEach(n -> {
                        if (!blacklist.contains(n.getId())) {
                            if (scoredItems.containsKey(n.getId())) {
                                scoredItems.get(n.getId()).increment(1);
                            } else {
                                scoredItems.put(n.getId(), new ScoredItem(n, 1));
                            }
                        }
                    });
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }

        return new ArrayList<ScoredItem>(scoredItems.values()).stream().sorted((s1, s2) -> (s2.score.intValue() - s1.score.intValue())).map(s -> {
            return new Recommendation(s.node, s.score.get());
        });
    }

from neo4j-apoc-procedures.

jexp avatar jexp commented on May 27, 2024

Yes I have to revisit it to switch to pure streaming,

also the expensive expand is from other-->reco

so you could try this:

MATCH (u:User) WHERE u.id = 463
MATCH (u)-[:LIKES]->(movie)<-[:LIKES]-(other)
WITH u, collect(distinct other) as others
CALL apoc.cypher.mapParallel("MATCH (_)-[:LIKES]->(reco:Movie) RETURN id(reco) as reco,count(*) as freq",{},others) yield value
RETURN value.reco as reco, sum(value.freq) as score
ORDER BY score DESC LIMIT 10;

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

Still takes 500+ms after 10 runs

neo4j-sh (?)$ MATCH (u:User) WHERE u.id = 463
> MATCH (u)-[:LIKES]->(movie)<-[:LIKES]-(other)
> WITH u, collect(distinct other) as others
> CALL apoc.cypher.mapParallel("MATCH (_)-[:LIKES]->(reco:Movie) RETURN id(reco) as reco,count(*) as freq",{},others) yield value
> RETURN value.reco as reco, sum(value.freq) as score
> ORDER BY score DESC LIMIT 10;
+--------------+
| reco | score |
+--------------+
| 4955 | 635   |
| 5347 | 485   |
| 5597 | 478   |
| 5392 | 466   |
| 5218 | 441   |
| 5479 | 427   |
| 5358 | 425   |
| 5245 | 404   |
| 4949 | 402   |
| 5176 | 402   |
+--------------+
10 rows
580 ms

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

Interestingly I don't have the same scores for both recommendations

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

Ok it can be, because I filter out some nodes with high level of LIKES degree

from neo4j-apoc-procedures.

ikwattro avatar ikwattro commented on May 27, 2024

@jexp There should be an issue with your process also, the first movie reco returned by both implementations is movie id 450, node id 4955.

As you can see in the previous shell output, mapParallel returns a score of 635 where has this node has only 466 incoming LIKES relationships :/

from neo4j-apoc-procedures.

pradeepponduri avatar pradeepponduri commented on May 27, 2024

@jexp
Are there conf settings that are to be made to make apoc.cypher.parallel to run faster ?
like increasing number of threads etc.

from neo4j-apoc-procedures.

ponduripradeep avatar ponduripradeep commented on May 27, 2024

Yes I have to revisit it to switch to pure streaming,

also the expensive expand is from other-->reco

so you could try this:

MATCH (u:User) WHERE u.id = 463
MATCH (u)-[:LIKES]->(movie)<-[:LIKES]-(other)
WITH u, collect(distinct other) as others
CALL apoc.cypher.mapParallel("MATCH (_)-[:LIKES]->(reco:Movie) RETURN id(reco) as reco,count(*) as freq",{},others) yield value
RETURN value.reco as reco, sum(value.freq) as score
ORDER BY score DESC LIMIT 10;

What is the difference in between parallel2 and mapParallel?

from neo4j-apoc-procedures.

fbiville avatar fbiville commented on May 27, 2024

Feel free to open the issue if you need to clarify how some of the parallel procedures work.

from neo4j-apoc-procedures.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.