Giter Site home page Giter Site logo

Comments (2)

shikhar avatar shikhar commented on July 18, 2024

What's the type of the timestamp field in your source data?

from kafka-connect-elasticsearch.

boeboe avatar boeboe commented on July 18, 2024

@shikhar ... your question made me reconsider my slack source connector that was indeed using a Schema.STRING_SCHEMA instead of Timestamp.SCHEMA

Full sources for reference...

package com.example.connect.slack;

import com.ullink.slack.simpleslackapi.SlackSession;
import com.ullink.slack.simpleslackapi.events.SlackMessagePosted;
import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory;
import com.ullink.slack.simpleslackapi.listeners.SlackMessagePostedListener;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by boeboe on 24/10/16.
 */
public class SlackSourceTask extends SourceTask implements SlackMessagePostedListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(SlackSourceTask.class);

    private SlackSourceConnectorConfig config;
    private SlackSession slackSession;
    private BlockingQueue<SourceRecord> queue = new LinkedBlockingQueue<>();

    private String topic = "";
    private Schema keySchema = SchemaBuilder.struct().name("com.slack.key")
            .field("channel", Schema.STRING_SCHEMA)
            .build();
    private Schema valueSchema = SchemaBuilder.struct().name("com.slack.message")
            .field("user", Schema.STRING_SCHEMA)
            .field("channel", Schema.STRING_SCHEMA)
            .field("text", Schema.STRING_SCHEMA)
            .field("timestamp", Timestamp.SCHEMA)
            .build();


    @Override
    public void start(Map<String, String> props) {
        LOGGER.info("Start a SlackSourceTask");

        config = new SlackSourceConnectorConfig(props);
        topic = config.getString(SlackSourceConnectorConfig.KAFKA_TOPIC_CONFIG);

        String apiToken = config.getString(SlackSourceConnectorConfig.SLACK_API_TOKEN_CONFIG);
        LOGGER.info("Going to connect to slack", apiToken);
        LOGGER.debug("APIToken in use: '{}'", apiToken);
        try {
            slackSession = SlackSessionFactory.createWebSocketSlackSession(apiToken);
            slackSession.connect();
        } catch (IOException e) {
            LOGGER.error("Couldn't open a connection to slack");
            throw new ConnectException(e);
        }

        slackSession.addMessagePostedListener(this);
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        SourceRecord record = queue.take();

        LOGGER.info("Polling new data from queue: {}", record.toString());

        Collections.addAll(records, record);
        return records;
    }

    @Override
    public void stop() {
        LOGGER.info("Stopping the SlackSourceTask");
        try {
            slackSession.removeMessagePostedListener(this);
            LOGGER.info("Removed slack post message listener");

            slackSession.disconnect();
            LOGGER.info("Disconnected from slack");
        } catch (IOException e) {
            LOGGER.error("Couldn't disconnect from slack");
            throw new ConnectException(e);
        }
    }

    @Override
    public String version() {
        return Version.getVersion();
    }

    @Override
    public void onEvent(SlackMessagePosted event, SlackSession session) {
        LOGGER.info("New slack message received '{}' on channel '{}'", event.getMessageContent(),
                event.getChannel().getName());

        // example timestring is '1469470591.759709' (micro second granularity)
        String timestring = event.getTimestamp();
        Long timestamp = Long.valueOf(timestring.replace(".", "")) / 1000;
        String channel = event.getChannel().getName();

        SourceRecord record = new SourceRecord(
                Collections.singletonMap("channel", channel),
                Collections.singletonMap("position", timestring),
                topic,
                null,
                keySchema,
                new Struct(keySchema).put("channel", channel),
                valueSchema,
                new Struct(valueSchema)
                        .put("user", event.getSender().getUserName())
                        .put("channel", channel)
                        .put("text", event.getMessageContent())
                        .put("timestamp", Timestamp.toLogical(Timestamp.SCHEMA, timestamp))
        );

        queue.add(record);
    }
}

Resulting elasticsearch index...

{
    "slack": {
        "aliases": {},
        "mappings": {
            "slack": {
                "properties": {
                    "channel": {
                        "type": "string"
                    },
                    "text": {
                        "type": "string"
                    },
                    "timestamp": {
                        "type": "date",
                        "format": "strict_date_optional_time||epoch_millis"
                    },
                    "user": {
                        "type": "string"
                    }
                }
            }
        },
        "settings": {
            "index": {
                "creation_date": "1477512378728",
                "number_of_shards": "5",
                "number_of_replicas": "1",
                "uuid": "4P2QYQNSQBa6ckLX-H1oag",
                "version": {
                    "created": "2040199"
                }
            }
        },
        "warmers": {}
    }
}

Resulting kibana results...

image

Works like a charm, thanks for hinting me in the proper direction on this one!

from kafka-connect-elasticsearch.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.