Comments (2)
What's the type of the timestamp
field in your source data?
from kafka-connect-elasticsearch.
@shikhar ... your question made me reconsider my slack source connector that was indeed using a Schema.STRING_SCHEMA
instead of Timestamp.SCHEMA
Full sources for reference...
package com.example.connect.slack;
import com.ullink.slack.simpleslackapi.SlackSession;
import com.ullink.slack.simpleslackapi.events.SlackMessagePosted;
import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory;
import com.ullink.slack.simpleslackapi.listeners.SlackMessagePostedListener;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by boeboe on 24/10/16.
*/
public class SlackSourceTask extends SourceTask implements SlackMessagePostedListener {
private static final Logger LOGGER = LoggerFactory.getLogger(SlackSourceTask.class);
private SlackSourceConnectorConfig config;
private SlackSession slackSession;
private BlockingQueue<SourceRecord> queue = new LinkedBlockingQueue<>();
private String topic = "";
private Schema keySchema = SchemaBuilder.struct().name("com.slack.key")
.field("channel", Schema.STRING_SCHEMA)
.build();
private Schema valueSchema = SchemaBuilder.struct().name("com.slack.message")
.field("user", Schema.STRING_SCHEMA)
.field("channel", Schema.STRING_SCHEMA)
.field("text", Schema.STRING_SCHEMA)
.field("timestamp", Timestamp.SCHEMA)
.build();
@Override
public void start(Map<String, String> props) {
LOGGER.info("Start a SlackSourceTask");
config = new SlackSourceConnectorConfig(props);
topic = config.getString(SlackSourceConnectorConfig.KAFKA_TOPIC_CONFIG);
String apiToken = config.getString(SlackSourceConnectorConfig.SLACK_API_TOKEN_CONFIG);
LOGGER.info("Going to connect to slack", apiToken);
LOGGER.debug("APIToken in use: '{}'", apiToken);
try {
slackSession = SlackSessionFactory.createWebSocketSlackSession(apiToken);
slackSession.connect();
} catch (IOException e) {
LOGGER.error("Couldn't open a connection to slack");
throw new ConnectException(e);
}
slackSession.addMessagePostedListener(this);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
SourceRecord record = queue.take();
LOGGER.info("Polling new data from queue: {}", record.toString());
Collections.addAll(records, record);
return records;
}
@Override
public void stop() {
LOGGER.info("Stopping the SlackSourceTask");
try {
slackSession.removeMessagePostedListener(this);
LOGGER.info("Removed slack post message listener");
slackSession.disconnect();
LOGGER.info("Disconnected from slack");
} catch (IOException e) {
LOGGER.error("Couldn't disconnect from slack");
throw new ConnectException(e);
}
}
@Override
public String version() {
return Version.getVersion();
}
@Override
public void onEvent(SlackMessagePosted event, SlackSession session) {
LOGGER.info("New slack message received '{}' on channel '{}'", event.getMessageContent(),
event.getChannel().getName());
// example timestring is '1469470591.759709' (micro second granularity)
String timestring = event.getTimestamp();
Long timestamp = Long.valueOf(timestring.replace(".", "")) / 1000;
String channel = event.getChannel().getName();
SourceRecord record = new SourceRecord(
Collections.singletonMap("channel", channel),
Collections.singletonMap("position", timestring),
topic,
null,
keySchema,
new Struct(keySchema).put("channel", channel),
valueSchema,
new Struct(valueSchema)
.put("user", event.getSender().getUserName())
.put("channel", channel)
.put("text", event.getMessageContent())
.put("timestamp", Timestamp.toLogical(Timestamp.SCHEMA, timestamp))
);
queue.add(record);
}
}
Resulting elasticsearch index...
{
"slack": {
"aliases": {},
"mappings": {
"slack": {
"properties": {
"channel": {
"type": "string"
},
"text": {
"type": "string"
},
"timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"user": {
"type": "string"
}
}
}
},
"settings": {
"index": {
"creation_date": "1477512378728",
"number_of_shards": "5",
"number_of_replicas": "1",
"uuid": "4P2QYQNSQBa6ckLX-H1oag",
"version": {
"created": "2040199"
}
}
},
"warmers": {}
}
}
Resulting kibana results...
Works like a charm, thanks for hinting me in the proper direction on this one!
from kafka-connect-elasticsearch.
Related Issues (20)
- How to Convert JSON String field to ES Object?
- Capture Kafka key without using it as ID HOT 2
- Suggestion for INSERT operation "Ignoring EXTERNAL version conflict for operation INDEX on document"
- Used Elastic Java REST client is deprecated in 7.15.0 HOT 1
- Error with `"behavior.on.null.values": "delete"`
- Consumer paused indefinitely when using `AsyncOffsetTracker` with lot of null values
- Cannot use data stream with time_series mode HOT 2
- Error: Cannot infer mapping without schema HOT 1
- Connector fails with payloads >20 MB HOT 1
- Can't create a connector even if its loaded in Strimzi
- Support requests per second configuration options
- Log when there are too many requests errors
- [BUG] `TOO_MANY_REQUESTS` error craches the tasks with a unrecoverable exceptions without retries
- Ignore 'document_parsing_exception' HOT 1
- Inconsistent Logging for Tombstone Messages in Elastic Sink Connector
- abnormal data loss question
- Data Stream naming is far too restrictive HOT 1
- Creating index based on Timestamp doesn't work
- Limit retry backoff (and unlimited retries) HOT 4
- add support for index templates other than logs and metrics as types when using data streams
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-connect-elasticsearch.