Comments (2)
@benjie I would give it a try
from crystal.
@benjie I got it to work for SimpleSubscription plugin with following code:
`
/**
* Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
* topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
* callback.
*/
export class FilteredListenStep<
TTopics extends { [topic: string]: any },
TTopic extends keyof TTopics,
TPayloadStep extends ExecutableStep
>
extends ExecutableStep<TTopics[TTopic]>
implements StreamableStep<TTopics[TTopic]>
{
static $$export = {
moduleName: "grafast",
exportName: "ListenStep",
};
isSyncAndSafe = true;
/**
* The id for the PostgreSQL context plan.
*/
private pubsubDep: number;
/**
* The plan that will tell us which topic we're subscribing to.
*/
private topicDep: number;
private eventTypeDep: number;
private eventTypePlan?: ExecutableStep<string | null>;
// Add a new private field for the filter function
private filterFunc:
| ((item: TTopics[TTopic], eventType: string | null) => boolean)
| null;
constructor(
pubsubOrPlan:
| ExecutableStep<GrafastSubscriber<TTopics> | null>
| GrafastSubscriber<TTopics>
| null,
topicOrPlan: ExecutableStep<TTopic> | string,
public itemPlan: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep = (
$item
) => $item as any,
eventTypePlan?: ExecutableStep<string | null>,
filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
) {
super();
const $topic =
typeof topicOrPlan === "string" ? constant(topicOrPlan) : topicOrPlan;
const $pubsub = isExecutableStep(pubsubOrPlan)
? pubsubOrPlan
: constant(pubsubOrPlan, false);
const $eventType = eventTypePlan || constant(null);
this.pubsubDep = this.addDependency($pubsub);
this.topicDep = this.addDependency($topic);
this.eventTypeDep = this.addDependency($eventType);
this.eventTypePlan = eventTypePlan;
this.filterFunc = filterFunc || null;
}
execute(): never {
throw new Error("ListenStep cannot be executed, it can only be streamed");
}
stream(
count: number,
values: readonly [
GrafastValuesList<GrafastSubscriber<TTopics>>,
GrafastValuesList<TTopic>,
GrafastValuesList<string | null>
]
): GrafastResultStreamList<TTopics[TTopic]> {
const pubsubs = values[this.pubsubDep as 0];
const topics = values[this.topicDep as 1];
const eventTypes = values[this.eventTypeDep as 2];
const result = [];
for (let i = 0; i < count; i++) {
const pubsub = pubsubs[i];
if (!pubsub) {
throw new SafeError(
"Subscription not supported",
isDev
? {
hint: `${
this.dependencies[this.pubsubDep]
} did not provide a GrafastSubscriber; perhaps you forgot to add the relevant property to context?`,
}
: {}
);
}
const topic = topics[i];
const eventType = eventTypes[i]; // Actual value of eventType
result[i] = async function* () {
const subscription = pubsub.subscribe(topic);
const asyncIterable =
subscription instanceof Promise ? await subscription : subscription;
for await (const item of asyncIterable) {
if (!this.filterFunc || this.filterFunc(item, eventType)) {
yield item;
}
}
}.bind(this)();
}
return result;
}
}
/**
* Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
* topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
* callback.
*/
export function filteredListen<
TTopics extends { [topic: string]: any },
TTopic extends keyof TTopics,
TPayloadStep extends ExecutableStep
>(
pubsubOrPlan:
| ExecutableStep<GrafastSubscriber<TTopics> | null>
| GrafastSubscriber<TTopics>
| null,
topicOrPlan: ExecutableStep<TTopic> | string,
itemPlan?: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep,
eventTypePlan?: ExecutableStep<string | null>, // Add this
filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
): FilteredListenStep<TTopics, TTopic, TPayloadStep> {
return new FilteredListenStep<TTopics, TTopic, TPayloadStep>(
pubsubOrPlan,
topicOrPlan,
itemPlan,
eventTypePlan,
filterFunc
);
}
const SimpleSubscriptionsPlugin = makeExtendSchemaPlugin((build) => {
const nodeIdHandlerByTypeName = build.getNodeIdHandlerByTypeName?.();
return {
typeDefs: [
gql`
extend type Subscription {
listen(topic: String!, eventType: String): ListenPayload
}
type ListenPayload {
event: String
}
`,
...// Only add the relatedNode if supported
(nodeIdHandlerByTypeName
? [
gql`
extend type ListenPayload {
relatedNode: Node
relatedNodeId: ID
}
`,
]
: []),
],
plans: {
Subscription: {
listen: {
subscribePlan: EXPORTABLE(
(context, jsonParse, lambda, listen) =>
function subscribePlan(_$root, { $topic, $eventType = null }) {
const $pgSubscriber = context().get("pgSubscriber");
const $derivedTopic = lambda($topic, (topic) => {
return `postgraphile:${topic}`;
});
const eventTypePlan = lambda(
$eventType,
(eventTypeValue) => eventTypeValue
);
const filterFunction = (item: any, eventTypeValue: any) => {
const parsedItem = JSON.parse(item);
return !eventTypeValue || parsedItem.event === eventTypeValue;
};
return filteredListen(
$pgSubscriber,
$derivedTopic,
jsonParse,
eventTypePlan,
filterFunction
);
},
[context, jsonParse, lambda, filteredListen]
),
plan: EXPORTABLE(
() =>
function plan($event) {
return $event;
},
[]
),
},
},
ListenPayload: {
event($event) {
return $event.get("event");
},
...(nodeIdHandlerByTypeName
? {
relatedNodeId($event) {
return nodeIdFromEvent($event);
},
relatedNode($event) {
const $nodeId = nodeIdFromEvent($event);
return node(nodeIdHandlerByTypeName, $nodeId);
},
}
: null),
},
},
};
});
from crystal.
Related Issues (20)
- Postgraphile computed column fails with tsrange and additional parameters HOT 7
- why Postraphile isn't exposing the parent table instead of the partition tables. HOT 3
- Views don't support one-to-one relationships HOT 1
- why Postraphile isn't exposing the parent table instead of the partition tables. HOT 1
- Does postgraphile supports containsValue while filtering JSON fields HOT 1
- Support Route level hooks for grafserv/fastify/v4 HOT 4
- Add `tsvector` codec to the registry HOT 1
- website: things to do before deployment HOT 3
- How to add condition for a table that is related to other(while joining tables) HOT 8
- Produce high-level telemetry HOT 1
- Document how to export the registry as executable and then build a schema from that in prod
- What is the recommended way to validate input? HOT 2
- Postgraphile@v5 keeps failing in a docker container HOT 3
- How to query a key that has ":" in the dictionary HOT 5
- Development mode warning printed when in test mode HOT 2
- @omit tag does not get emulated correctly in v5
- Allow adding validation rules to Grafserv HOT 2
- Smart tag to shortcut accessing an attribute from a related table
- Current enum value isn't present on GraphQLEnumType_values_value scope HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from crystal.