Giter Site home page Giter Site logo

Filtering of `listen()` about crystal HOT 2 OPEN

benjie avatar benjie commented on July 24, 2024
Filtering of `listen()`

from crystal.

Comments (2)

1mehal avatar 1mehal commented on July 24, 2024 1

@benjie I would give it a try

from crystal.

1mehal avatar 1mehal commented on July 24, 2024 1

@benjie I got it to work for SimpleSubscription plugin with following code:

`
/**
 * Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
 * topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
 * callback.
 */
export class FilteredListenStep<
    TTopics extends { [topic: string]: any },
    TTopic extends keyof TTopics,
    TPayloadStep extends ExecutableStep
  >
  extends ExecutableStep<TTopics[TTopic]>
  implements StreamableStep<TTopics[TTopic]>
{
  static $$export = {
    moduleName: "grafast",
    exportName: "ListenStep",
  };
  isSyncAndSafe = true;

  /**
   * The id for the PostgreSQL context plan.
   */
  private pubsubDep: number;

  /**
   * The plan that will tell us which topic we're subscribing to.
   */
  private topicDep: number;

  private eventTypeDep: number;

  private eventTypePlan?: ExecutableStep<string | null>;

  // Add a new private field for the filter function
  private filterFunc:
    | ((item: TTopics[TTopic], eventType: string | null) => boolean)
    | null;

  constructor(
    pubsubOrPlan:
      | ExecutableStep<GrafastSubscriber<TTopics> | null>
      | GrafastSubscriber<TTopics>
      | null,
    topicOrPlan: ExecutableStep<TTopic> | string,
    public itemPlan: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep = (
      $item
    ) => $item as any,
    eventTypePlan?: ExecutableStep<string | null>,
    filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
  ) {
    super();
    const $topic =
      typeof topicOrPlan === "string" ? constant(topicOrPlan) : topicOrPlan;
    const $pubsub = isExecutableStep(pubsubOrPlan)
      ? pubsubOrPlan
      : constant(pubsubOrPlan, false);
    const $eventType = eventTypePlan || constant(null);
    this.pubsubDep = this.addDependency($pubsub);
    this.topicDep = this.addDependency($topic);
    this.eventTypeDep = this.addDependency($eventType);
    this.eventTypePlan = eventTypePlan;
    this.filterFunc = filterFunc || null;
  }

  execute(): never {
    throw new Error("ListenStep cannot be executed, it can only be streamed");
  }

  stream(
    count: number,
    values: readonly [
      GrafastValuesList<GrafastSubscriber<TTopics>>,
      GrafastValuesList<TTopic>,
      GrafastValuesList<string | null>
    ]
  ): GrafastResultStreamList<TTopics[TTopic]> {
    const pubsubs = values[this.pubsubDep as 0];
    const topics = values[this.topicDep as 1];
    const eventTypes = values[this.eventTypeDep as 2];
    const result = [];
    for (let i = 0; i < count; i++) {
      const pubsub = pubsubs[i];
      if (!pubsub) {
        throw new SafeError(
          "Subscription not supported",
          isDev
            ? {
                hint: `${
                  this.dependencies[this.pubsubDep]
                } did not provide a GrafastSubscriber; perhaps you forgot to add the relevant property to context?`,
              }
            : {}
        );
      }
      const topic = topics[i];
      const eventType = eventTypes[i]; // Actual value of eventType
      result[i] = async function* () {
        const subscription = pubsub.subscribe(topic);
        const asyncIterable =
          subscription instanceof Promise ? await subscription : subscription;
        for await (const item of asyncIterable) {
          if (!this.filterFunc || this.filterFunc(item, eventType)) {
            yield item;
          }
        }
      }.bind(this)();
    }
    return result;
  }
}

/**
 * Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
 * topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
 * callback.
 */
export function filteredListen<
  TTopics extends { [topic: string]: any },
  TTopic extends keyof TTopics,
  TPayloadStep extends ExecutableStep
>(
  pubsubOrPlan:
    | ExecutableStep<GrafastSubscriber<TTopics> | null>
    | GrafastSubscriber<TTopics>
    | null,
  topicOrPlan: ExecutableStep<TTopic> | string,
  itemPlan?: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep,
  eventTypePlan?: ExecutableStep<string | null>, // Add this
  filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
): FilteredListenStep<TTopics, TTopic, TPayloadStep> {
  return new FilteredListenStep<TTopics, TTopic, TPayloadStep>(
    pubsubOrPlan,
    topicOrPlan,
    itemPlan,
    eventTypePlan,
    filterFunc
  );
}

const SimpleSubscriptionsPlugin = makeExtendSchemaPlugin((build) => {
  const nodeIdHandlerByTypeName = build.getNodeIdHandlerByTypeName?.();
  return {
    typeDefs: [
      gql`
        extend type Subscription {
          listen(topic: String!, eventType: String): ListenPayload
        }
        type ListenPayload {
          event: String
        }
      `,
      ...// Only add the relatedNode if supported
      (nodeIdHandlerByTypeName
        ? [
            gql`
              extend type ListenPayload {
                relatedNode: Node
                relatedNodeId: ID
              }
            `,
          ]
        : []),
    ],
    plans: {
      Subscription: {
        listen: {
          subscribePlan: EXPORTABLE(
            (context, jsonParse, lambda, listen) =>
              function subscribePlan(_$root, { $topic, $eventType = null }) {
                const $pgSubscriber = context().get("pgSubscriber");
                const $derivedTopic = lambda($topic, (topic) => {
                  return `postgraphile:${topic}`;
                });
                const eventTypePlan = lambda(
                  $eventType,
                  (eventTypeValue) => eventTypeValue
                );
                const filterFunction = (item: any, eventTypeValue: any) => {
                  const parsedItem = JSON.parse(item);
                  return !eventTypeValue || parsedItem.event === eventTypeValue;
                };

                return filteredListen(
                  $pgSubscriber,
                  $derivedTopic,
                  jsonParse,
                  eventTypePlan,
                  filterFunction
                );
              },
            [context, jsonParse, lambda, filteredListen]
          ),
          plan: EXPORTABLE(
            () =>
              function plan($event) {
                return $event;
              },
            []
          ),
        },
      },
      ListenPayload: {
        event($event) {
          return $event.get("event");
        },
        ...(nodeIdHandlerByTypeName
          ? {
              relatedNodeId($event) {
                return nodeIdFromEvent($event);
              },
              relatedNode($event) {
                const $nodeId = nodeIdFromEvent($event);
                return node(nodeIdHandlerByTypeName, $nodeId);
              },
            }
          : null),
      },
    },
  };
});

from crystal.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.