Giter Site home page Giter Site logo

cert-polska / karton Goto Github PK

View Code? Open in Web Editor NEW
380.0 24.0 45.0 714 KB

Distributed malware processing framework based on Python, Redis and S3.

Home Page: https://karton-core.readthedocs.io/en/latest/

License: BSD 3-Clause "New" or "Revised" License

Python 99.72% Dockerfile 0.28%
karton cert csirt pipeline malware-analysis malware-research cybersecurity

karton's Introduction

Karton

Distributed malware processing framework based on Python, Redis and S3.

The idea

Karton is a robust framework for creating flexible and lightweight malware analysis backends. It can be used to connect malware* analysis systems into a robust pipeline with very little effort.

We've been in the automation business for a long time. We're dealing with more and more threats, and we have to automate everything to keep up with incidents. Because of this, we often end up with many scripts stuck together with duck duct tape and WD-40. These scripts are written by analysts in the heat of the moment, fragile and ugly - but they work, and produce intel that must be stored, processed further, sent to other systems or shared with other organisations.

We needed a way to take our PoC scripts and easily insert them into our analysis pipeline. We also wanted to monitor their execution, centralise logging, improve robustness, reduce development inertia... For this exact purpose, we created Karton.

* while Karton was designed with malware analysis in mind, it works nicely in every microservice-oriented project.

Installation

Installation is as easy as a single pip install command:

pip3 install karton-core

In order to setup the whole backend environment you will also need S3-compatible storage and Redis, see the docs for details.

Example usage

To use karton you have to provide class that inherits from Karton.

from karton.core import Karton, Task, Resource

class GenericUnpacker(Karton):
    """
    Performs sample unpacking
    """
    identity = "karton.generic-unpacker"
    filters = [
        {
            "type": "sample",
            "kind": "runnable",
            "platform": "win32"
        }
    ]

    def process(self, task: Task) -> None:
        # Get sample object
        packed_sample = task.get_resource('sample')
        # Log with self.log
        self.log.info(f"Hi {packed_sample.name}, let me analyze you!")
        ...
        # Send our results for further processing or reporting
        task = Task(
            {
               "type": "sample",
               "kind": "raw"
            }, payload = {
               "parent": packed_sample,
               "sample": Resource(filename, unpacked)
            })
        self.send_task(task)

if __name__ == "__main__":
    GenericUnpacker.main()

Karton systems

Some Karton systems are universal and useful to everyone. We decided to share them with the community.

This repository. It contains the karton.system service - main service, responsible for dispatching tasks within the system. It also contains the karton.core module, that is used as a library by other systems.

A small Flask dashboard for task and queue management and monitoring.

The "router". It recognises samples/files and produces various task types depending on the file format. Thanks to this, other systems may only listen for tasks with a specific format (for example, only zip archives).

Generic archive unpacker. Archives uploaded into the system will be extracted, and every file will be processed individually.

Malware extractor. It uses Yara rules and Python modules to extract static configuration from malware samples and analyses. It's a fishing rod, not a fish - we don't share the modules themselves. But it's easy to write your own!

A very important part of the pipeline. Reporter submits all files, tags, comments and other intel produced during the analysis to MWDB. If you don't use MWDB yet or just prefer other backends, it's easy to write your own reporter.

Automatically runs Yara rules on all files in the pipeline, and tags samples appropriately. Rules not included ;).

Karton system that decodes files encoded with common methods, like hex, base64, etc. (You wouldn't believe how common it is).

A small wrapper around AutoIt-Ripper that extracts embedded AutoIt scripts and resources from compiled AutoIt executables.

Automated black-box malware analysis system with DRAKVUF engine under the hood, which does not require an agent on guest OS.


This is how these systems can be used to form a basic malware analysis pipeline:

Community projects that incorporate the Karton framework

A modular Karton Framework service that unpacks common packers like UPX and others using the Qiling Framework.

A Feature Rich Modular Malware Configuration Extraction Utility for MalDuck

Detect-It-Easy classifier for the Karton framework

RetDec unpacker module for the Karton framework

Malware similarity platform with modularity in mind.

Co-financed by the Connecting Europe Facility by of the European Union

karton's People

Contributors

antelox avatar bonusplay avatar catsuryuu avatar chivay avatar conitrade-as avatar gallypette avatar icedevml avatar itayc0hen avatar maitrerenard avatar mak avatar mimi89999 avatar msm-code avatar nazywam avatar psrok1 avatar rakovskij-stanislav avatar yankovs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

karton's Issues

Task status assertions

Add assertions in karton-system that will prevent/report incorrect task status transitions which may indicate a bug e.g. finished/crashed task become started, state transition for finished (deleted) task etc.

Support injecting binds/filters and outgoing task headers via the CLI

It will be superb if it will be possible to add binds (filters) to a Karton via the CLI. For example, some kartons were designed to support X types of files, but lacking another one that might be relevant only for some users.

In addition, it would be great to be able to add\override task headers to outgoing tasks in a similar way. For example, users might want to add status:reported for files that were already reported by karton-mwdb-reporter. Or anything similar.

Support for injecting both headers and binds, will work together great if users would like to add custom filters\headers to their ecosystem, without forking the open-source kartons. It might also allow them to drop some headers.

This solution can be implement in a way that users can use CLI or configurations in order to control this.

.

Broken karton-system after recent changes

$ karton-system
Traceback (most recent call last):
  File "/home/chivay/.local/bin/karton-system", line 33, in <module>
    sys.exit(load_entry_point('karton-core', 'console_scripts', 'karton-system')())
  File "/home/chivay/repos/karton/karton/system/system.py", line 219, in main
    parser = cls.args_parser()
  File "/home/chivay/repos/karton/karton/system/system.py", line 213, in args_parser
    parser = super().args_parser()
AttributeError: 'super' object has no attribute 'args_parser'

Scaling issues in karton - thread

Hey guys,
we are using karton system for a while now, and we have reached a certain point where scaling the analysis throughput became a real issue.
Each time the amount of total tasks in redis goes up at about >500k (the specific number isn't important, it depends on the environment), the entire system will eventually collapse where it starts with karton-system constantly crashing, karton-dashboard not responding, and from there the road to chaos is pretty short.

Just to clarify our situation - the entire infrastructure is hosted as containers on AWS ECS (all kartons, including karton-system) and we use a pretty strong managed redis server (I don't think its computation power/storage size/connection capacity would be the bottleneck).

I investigated the causes for these crashes and issues we had, and learned a few things I would like to share and have a discussion over it:

  1. The karton-system component is a crucial part of karton framework, and currently is also a single-point-of-failure for the entire system. This is an extremely important point which I will back to it later.
  2. Currently, the karton-system functionality can be scaled-out partially - several instances of it would consume the karton.tasks and karton.operations properly, but the garbage collection wouldn't be scaled efficiently. This is because each garbage collection process will run tasks = self.backend.get_all_tasks() and perform collection over this data, which is shared between other instances of the same karton-system. I think this component must be scaled-out properly.
  3. The root cause for the system to crash on high workload is tasks = self.backend.get_all_tasks() when trying to do garbage collection, or more precisely for task_data in self.redis.mget(tasks) in get_all_tasks(). When the queue of tasks increases, each redis.mget(<all_tasks>) becomes more computation-intensive, and slows down the karton operations handling. This causes the queues to increase even more, and eventually, karton-system will be going from gc call to another gc call. In the end, in our case, the karton-system is being killed by the OS due to out-of-memory, but even if it lives, it isn't operational.
  4. In the short term, I would reimplement get_all_tasks() to return a generator instead of a list, and each time query a chunk of tasks (10k tasks for example). This would solve some of the stability issues (including out-of-memory issues) - It is quite trivial, but I can explain if needed.
  5. In the long term, we can still meet situations where we starve karton operations because of constant garbage collections. I think we need to find ways to prioritize it somehow or make the gc a more "lightweight" operation (maybe a redis list of gc actions ? or part of karton.operations list ?)
  6. karton-dashboard - we find this tool extremely valuable to monitor the workloads. Unfortunately, he becomes non-responsive with high queues due to the same get_all_tasks() call on each GET request. the general usage of karton-dashboard for us is understanding the size of the queues and canceling/reproducing crushed tasks, so we rarely look at tasks content. I would be glad to propose improvements to its responsiveness through PR if you don't have any plans to do so.

Thanks again for all the efforts you make into this system, we managed to achieve great things using it so far.
@psrok1 @nazywam

Get task tree status

Hello. I have a need to know if the task tree is finished.

As I can see, the origin task will be garbage collected even if the whole tree is not finished.

For now I use direct redis call to get tree status:

class TaskTreeState(enum.Enum):
    # None of the tasks in Finished or Crashed state
    RUNNING = "Running"
    # All of the tasks are finished OR no tasks are found
    FINISHED = "Finished"
    # One of the task in tree is in crached state.
    # Only user (via karton-dashboard) or time (via CRACHED TIMEOUT) can help
    CRASHED = "Crashed"


def check_task_tree_status(origin_uid):
    red = redis.Redis(
        host=config.redis_config.get("host"),
        port=config.redis_config.get("port"))

    statuses = []
    for task_b in red.mget(red.keys("karton.task:*")):
        task = json.loads(task_b)
        if task.get("root_uid") == origin_uid:
            task_status = task.get("status")
            print(task_status)
            if not task_status:
                continue
            statuses.append(task_status)
            # if one of them is crashed - consider the whole tree failure
            if task_status.lower() == TaskState.CRASHED.name.lower():
                return TaskTreeState.CRASHED
    # if any of them is not finished - consider the whole tree as running
    if any(t.lower() != TaskState.FINISHED.name.lower() for t in statuses):
        return TaskTreeState.RUNNING
    return TaskTreeState.FINISHED


check_task_tree_status("da354c49-75a9-4a36-86a0-1f06b96531cc")

I propose to implement something similar to karton-system.
And, if there are any possibility, make some caching of tree status without need to iterate the whole tasks in redis.

Remove remnants of MinIO

Karton was migrated to boto3, however there are still a few places where mentions of MinIO remain (including the readme).

$ rg -i minio | wc -l
46

Args parser is not extensible

I've wanted to add --identity flag to the Karton.args_parser method, but there's no way to do it without breaking backward compatibility.

I think, that instead of passing config:ย Config to the constructor, we should pass args. I.e. instead of:

        config = Config(args.config_file)
        service = ConfigExtractor(
            config,
            modules=args.modules,
            result_tags=args.tag,
            result_attributes=dict(attributes),
        )

we should've used:

        service = ConfigExtractor(
            args,
            modules=args.modules,
            result_tags=args.tag,
            result_attributes=dict(attributes),
        )

I think it's not a good idea to break backward compatibility because of it (even though most kartons don't override their main() method). But a interesting lesson and worth keeping in mind for the future.

Add optional redis auth to karton

There are few situations where authentication to the Karton's central Redis server is desirable:

  1. Karton is deployed on the public network, and Redis server has to be accessible to other services (but obviously not to the whole Internet)
  2. Defence in depth, or more complex access control (for example, readonly users)

This change should be relative easy to implement, and we already require auth to the central minio server.

Clean handling of dropped connections to Redis

When Redis server dies abruptly - for example by force restarting the machine, karton services will become unresponsive indefinitely and won't recover after Redis comes back.

This should be resolved by either:

  • reconnecting automatically and restoring the operation
  • or exiting the process

RFC: Controllable task properties

Buckle up, it's a long one!
I'd like to suggest the use of a "properties" payload (or a Karton Task's class member) with some common properties one can define for a task in karton.
This properties payload is currently used in Checkpoint research in our production kartons and mwdb. We think it will be wise to use it across CERTPL's existing kartons.

*Note that we will use Payload as an implementation example, but it can also be anything else.

The idea

Encourage users to add a payload (if part of karton itself, than a dictionary item in KartonTask will be better) called properties to their tasks. These properties will instruct several Kartons (developed by CERT.PL) how to treat the task or payloads inside it. The properties we found useful are the following:

  • volatile: boolean, defaults to false
  • emulate: boolean, defaults to true
  • force_analyze: boolean, defaults to false
  • force_emulate: boolean, defaults to false

Usage

I think the best way to show why it might be a good idea is from examples of actual, running kartons.

volatile

The main usage of the volatile property is to instruct kartons to treat the sample of the task as volatile, meaning there's no need to report it to external databases (e.g MWDB, MISP). An easy implementation in karton-mwdb-reporter that will prevent the karotn from report a file to MWDB will look like this:

def process(self, task):
        # Checking if task is volatile.
        if task.has_payload("properties"):
            if task.get_payload("properties").get("volatile") == True:
                self.log.info("Task is volatile. Finishing here.")
                return
        ...

This way, samples won't be reported to MWDB in presence of a positive volatile property; either because it is some auxiliary sample made for testing, or a sample not interesting enough to have uploaded to the database. Consequently, another simple yet nice use of the volatile property is to create some internal testing/utility producers to be used by developers/analysts/researchers etc.
For instance, let's take a look at tester-producer.py:

import os, sys
from karton.core import Producer, Resource, Task

producer = Producer()

filename = sys.argv[1]
with open(filename, "rb") as f:
    content = f.read()

resource = Resource(os.path.basename(filename), content)

task = Task({"type": "sample", "kind": "raw"})
task.add_payload("sample", resource)
task.add_payload("properties", {"volatile": True})

producer.send_task(task)

It can help locally test staging or even production environments without the need to store the files in a datbase (MWDB), and still get the whole flow of the system. It can also be used to quickly analyze a file without having it show up in MWDB and sandboxed -- that's what the emulate keyword is for.

An example for something we've done recently using this approach, is that we sent millions of PDF files to our karton system, and we don't want them to be stored in MWDB unless we believe a PDF sample contains an exploit. So we produced these millions of tasks with the volatile flag turned on, and prevented them to flood mwdb.

emulate

Another useful usage is for controlling sandboxing and analysis of samples, in case you've set up some system to sandbox samples and report the results back into karton (which we all do).
Let's say you've set up feeders to get samples from different sources. It may happen that the same samples will come from different sources again and again. In such case we do not want to re-sandbox the sample, since emulating a file in a sandbox is costly and will probably pollute the database with redundant dumps and artifacts. A simple solution might be adding the following code to the kartons responsible for sending samples to be sandboxed:

# a previous check was done to ensure the sample
# was already emulated
if task.get_payload("properties").get("emulate") == False:
    self.log.info("Not going to emulate it.")
    ...
else:
    # logic that sends to a sandbox
    ...

force_analyze and force_emulate

These two, as the name suggests, force a sample to be analyzed/sandboxed. In the case of sandboxing, force_emulate will override emulate, whatever its value may be. The main usage of these two is for sample sources which heuristically produce "interesting" samples. For example, the upload button in MWDB, or the "reanalyze" button in MWDB. It is reasonable to assume that if an analyst chose to upload a sample, or re-analyze it, it is because they think it's an interesting one and would want it to be emulated.
Therefore, we suggest the following small change in mwdb-core/mwdb/core/karton.py:

task = Task(
            headers={"type": "sample", "kind": "raw", "quality": feed_quality},
            payload={
                "sample": Resource(file.file_name, path=path, sha256=file.sha256),
                "attributes": file.get_attributes(as_dict=True, check_permissions=False),
                "properties": {"force_analyze": True, "force_emulate": True}, # ensures what we wanted
            },
            priority=task_priority,
        )

And thus, when implementing a karton/system to dispatch samples from karton into a sandbox, this simple logic will work:

if task.get_payload("properties").get("force_emulate") == True:
    # send to a sandbox logic
    ...
elif task.get_payload("properties").get("emulate") == True:
    # send to a sandbox logic
    ...
else:
    # no emulation for you
    ...

These properties can be mixed and matched for different scenarios. For example:
  1. force analyze a complete flow of an existing file (reanalyze on MWDB)

    properties: {
        "force_analyze": True,
        "force_emulate": True,
    }
  2. new files to be analyzed by kartons without emulation and without saving

    properties: {
        "emulate": False,
        "volatile": True
    }
  3. normal file analysis (default values):

    Could be without properties
    
  4. normal file analysis without sandbox:

    properties: {
        "emulate": False,
    }

Additional points

backwards compatibility(?)

The only case in which backwards compatibility is broken is if someone already implemented the same properties payload in their kartons, and wrote logic revolving around these exact properties that were mentioned. Then it might cause a conflict
However I believe this is not a problem because it is way too specific of a change to the logic. Also, if someone actually had these changes, they uses a forked version of the official kartons (reporter, for example). So there are two cases: either this addition helps them move to upstream version and not use forked repos, or they continue using a fork as they did up until now.

Must be a payload?

No, it does not. It can also be something integrated directly into the Task calss like "priority". We used apyload because it was the easiest to implement without branching from karton-core and other kartons.
Some other alternatives might be adding these properties to the header of tasks. It may be a personal preference but I think they should belong to a dictionary field (payload or not) called properties because these fields are ultimately user controllable, whereas thing like kind or type are internal values inferred as the task moves through different kartons.

Ok, so what actually needs to be changed?

The two main changes were mentioned above, in karton-mwdb-reporter and karton.py from mwdb-core. Other examples of places where it makes sense is, for example, this example drakvuf producer.
If there are other places where you think this might be useful, We'd be glad to discuss it and implement it if needed.
Another area what'll probably need to be changed is the docs; to let people know this exists. This issue can be used as a basis for the docs. In any case, we have no problem handling documentation :)


Of course, these are just a limited number of examples and anyone can extend this idea to whatever they might need. Let us know what you think about it.

redis.exceptions.ReadOnlyError: You can't write against a read only replica

I'm deploying a Karton stack as follows, with separate Min.io and Redis stacks. It stays up for a few hours and then some services start crashing at random with the traceback below (i.e., ReadOnlyError). The rest of the systems using Redis (MWDB, Drakvuf) work fine.

Anything I should pay attention to?

version: "3.7"


x-karton-common: &karton-common
  volumes:
    - /data/stacks/karton/karton.docker.ini:/etc/karton/karton.ini
    - /data/stacks/mwdb/mwdb.ini:/app/mwdb.ini
  networks:
    - minio_external
    - redis
    - karton

services:
  karton-system:
    <<: *karton-common
    image: certpl/karton-system
    entrypoint: karton-system
    command: --setup-bucket

  karton-mwdb-reporter:
    <<: *karton-common
    image: certpl/karton-mwdb-reporter
  
  karton-classifier:
    <<: *karton-common
    image: certpl/karton-classifier

  karton-archive-extractor:
    <<: *karton-common
    image: certpl/karton-archive-extractor
  
  karton-config-extractor:
    <<: *karton-common
    image: certpl/karton-config-extractor
    volumes:
      - /data/stacks/karton/karton.docker.ini:/etc/karton/karton.ini
      - /data/stacks/mwdb/mwdb.ini:/app/mwdb.ini
      - /data/malduck-extractor-modules:/modules
    
  karton-autoit-ripper:
    <<: *karton-common
    image: certpl/karton-autoit-ripper
    
  karton-yaramatcher:
    <<: *karton-common
    image: certpl/karton-yaramatcher
    command: karton-yaramatcher --rules /rules
    volumes:
      - /data/yara-rules/reversinglabs-yara-rules/yara:/rules
      - /data/stacks/karton/karton.docker.ini:/etc/karton/karton.ini
      - /data/stacks/mwdb/mwdb.ini:/app/mwdb.ini
    
  karton-asciimagic:
    <<: *karton-common
    image: certpl/karton-asciimagic

  karton-dashboard:
    <<: *karton-common
    image: certpl/karton-dashboard
    networks:
      - redis
      - minio_external
      - karton
      - public

networks:
  minio_external:
    external: true
  redis:
    external: true
  karton:
    external: true
  public:
    external: true
[2022-06-08 11:26:02,379][INFO] Manager karton.system started
/usr/local/lib/python3.7/site-packages/karton/core/logger.py:59: UserWarning: There is no active log consumer to receive logged messages.
  warnings.warn("There is no active log consumer to receive logged messages.")
Traceback (most recent call last):
  File "/usr/local/bin/karton-system", line 8, in <module>
    sys.exit(SystemService.main())
  File "/usr/local/lib/python3.7/site-packages/karton/system/system.py", line 309, in main
    service.loop()
  File "/usr/local/lib/python3.7/site-packages/karton/system/system.py", line 251, in loop
    self.process_routing()
  File "/usr/local/lib/python3.7/site-packages/karton/system/system.py", line 230, in process_routing
    [KARTON_TASKS_QUEUE, KARTON_OPERATIONS_QUEUE], timeout=5
  File "/usr/local/lib/python3.7/site-packages/karton/core/backend.py", line 401, in consume_queues
    return self.redis.blpop(queues, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/redis/commands/core.py", line 2466, in blpop
    return self.execute_command("BLPOP", *keys)
  File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 1231, in execute_command
    lambda error: self._disconnect_raise(conn, error),
  File "/usr/local/lib/python3.7/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
  File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 1229, in <lambda>
    conn, command_name, *args, **options
  File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 1204, in _send_command_parse_response
    return self.parse_response(conn, command_name, **options)
  File "/usr/local/lib/python3.7/site-packages/redis/client.py", line 1243, in parse_response
    response = connection.read_response()
  File "/usr/local/lib/python3.7/site-packages/redis/connection.py", line 842, in read_response
    raise response
redis.exceptions.ReadOnlyError: You can't write against a read only replica.

Here's how Redis is deployed (of course this is not a prod environment):

version: "3.6"
services:
  redis:
    networks:
      - redis
    image: redis
    ports:
      - 6379:6379


networks:
  redis:
    external: true

Implement Zipfile wrapper instead of DirectoryResource

DirectoryResource is specialized type of resource that contains zip-packed bunch of files and is used mainly for dumps. That type is marked with special flag and deserialized directly to the DirectoryResource object.

The problem is that sometimes the task producer wants to send some ready-made zip files along with other payload without a knowledge that zip files are special and should be serialized as DirectoryResource instead of Resource (Drakvuf Sandbox case).

In addition: separate DirectoryResource class complicates inheritance model, making it two-dimensional (local/remote, file/directory) which is unnecessary. Instead of treating ZipFile as different type of resource, we should just provide bunch of utility functions in Karton that will simplify packing/unpacking operations on Resources containing .zips.

Consumer filter: exclusions and simple wildcards

It might be useful to have negative filters and simple wildcard pattern matching.

Wildcard matching is simple, just add the * that will be checked by karton.system with possibility to escape that character if we want to have an actual *. I'm not sure about negative filters whether it's better to have an additional exclusion list or {"kind": "!raw"} filter prepended by ! which matches everything different than raw.

https://github.com/CERT-Polska/karton/blob/master/karton/core/task.py#L155

urllib3.exceptions.ProtocolError: Connection broken: IncompleteRead

I'm running the following consumer on 20 replicas:

# rulematcher/rulematcher.py
...
    def process(self, task: Task) -> None:  # type: ignore
        headers = task.headers
        sample: ResourceBase = task.get_resource("sample")
        analysis = None

        if headers["type"] == "sample":
            if sample.content is None:
                return None

            log.info("Processing sample %s", sample.metadata.get("sha256"))
...

The if sample.content is None line randomly triggers this error:

[
        "Traceback (most recent call last):\n",
        "  File \"/usr/local/lib/python3.7/site-packages/urllib3/response.py\", line 441, in _error_catcher\n    yield\n",
        "  File \"/usr/local/lib/python3.7/site-packages/urllib3/response.py\", line 518, in read\n    data = self._fp.read() if not fp_closed else b\"\"\n",
        "  File \"/usr/local/lib/python3.7/http/client.py\", line 478, in read\n    s = self._safe_read(self.length)\n",
        "  File \"/usr/local/lib/python3.7/http/client.py\", line 630, in _safe_read\n    raise IncompleteRead(b''.join(s), amt)\n",
        "http.client.IncompleteRead: IncompleteRead(2097152 bytes read, 15508683 more expected)\n",
        "\nDuring handling of the above exception, another exception occurred:\n\n",
        "Traceback (most recent call last):\n",
        "  File \"/usr/local/lib/python3.7/site-packages/karton/core/karton.py\", line 178, in internal_process\n    self.process(self.current_task)\n",
        "  File \"/usr/local/lib/python3.7/site-packages/karton/rulematcher/rulematcher.py\", line 260, in process\n    if sample.content is None:\n",
        "  File \"/usr/local/lib/python3.7/site-packages/karton/core/resource.py\", line 413, in content\n    return self.download()\n",
        "  File \"/usr/local/lib/python3.7/site-packages/karton/core/resource.py\", line 467, in download\n    self._content = self.backend.download_object(self.bucket, self.uid)\n",
        "  File \"/usr/local/lib/python3.7/site-packages/karton/core/backend.py\", line 600, in download_object\n    return reader.read()\n",
        "  File \"/usr/local/lib/python3.7/site-packages/urllib3/response.py\", line 544, in read\n    raise IncompleteRead(self._fp_bytes_read, self.length_remaining)\n",
        "  File \"/usr/local/lib/python3.7/contextlib.py\", line 130, in __exit__\n    self.gen.throw(type, value, traceback)\n",
        "  File \"/usr/local/lib/python3.7/site-packages/urllib3/response.py\", line 458, in _error_catcher\n    raise ProtocolError(\"Connection broken: %r\" % e, e)\n",
        "urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(2097152 bytes read, 15508683 more expected)', IncompleteRead(2097152 bytes read, 15508683 more expected))\n"
    ]

This seems to be the culprit karton/backend.py:

    def download_object(self, bucket: str, object_uid: str) -> bytes:
        """
        Download resource object from object storage.

        :param bucket: Bucket name
        :param object_uid: Object identifier
        :return: Content bytes
        """
        reader = self.minio.get_object(bucket, object_uid)
        try:
            return reader.read()
        finally:
            reader.release_conn()
            reader.close()

I have about 1000 active tasks. The MinIO stack is deployed as follows:

version: "3.9"

# Settings and configurations that are common for all containers
x-minio-common: &minio-common
  image: quay.io/minio/minio:RELEASE.2022-05-08T23-50-31Z
  command: server --console-address ":9001" http://minio{1...4}/data{1...2}
  environment:
    MINIO_ROOT_USER: "***"
    MINIO_ROOT_PASSWORD: "***"
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
    interval: 30s
    timeout: 20s
    retries: 3
  networks:
    - minio_internal
  deploy:
    restart_policy:
      condition: on-failure
    resources:
      limits:
        memory: 512M
        cpus: "0.5"

# starts 4 docker containers running minio server instances.
# using nginx reverse proxy, load balancing, you can access
# it through port 9000.
services:
  minio1:
    <<: *minio-common
    hostname: minio1
    volumes:
      - /data/shares/stor04/minio/data1-1:/data1
      - /data/shares/stor04/minio/data1-2:/data3

  minio2:
    <<: *minio-common
    hostname: minio2
    volumes:
      - /data/shares/stor04/minio/data2-1:/data1
      - /data/shares/stor04/minio/data2-2:/data2

  minio3:
    <<: *minio-common
    hostname: minio3
    volumes:
      - /data/shares/stor04/minio/data3-1:/data1
      - /data/shares/stor04/minio/data3-2:/data2

  minio4:
    <<: *minio-common
    hostname: minio4
    volumes:
      - /data/shares/stor04/minio/data4-1:/data1
      - /data/shares/stor04/minio/data4-2:/data2

  nginx:
    image: nginx:alpine
    hostname: minio
    volumes:
      - /data/stacks/minio/nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - minio1
      - minio2
      - minio3
      - minio4
    networks:
      - minio_internal
      - minio_external
      - public
    ports:
      - 9000:9000
      - 9001:9001
    deploy:
      restart_policy:
        condition: on-failure
      resources:
        limits:
          memory: 256M
          cpus: "0.5"

networks:
  minio_external:
    external: true
    name: minio_external
  minio_internal:
    external: true
    name: minio_internal
  public:
    external: true
    name: public

KartonTest: mock backend instead of changing base class.

karton.test is unittest suite for testing Karton services.

Currently it mocks the Karton classes by forcefully changing their base class from Consumer to KartonMock. Unfortunately this approach doesn't work well, when Karton service implements custom __init__ method.

With KartonBackend we can just setup a mock on the redis/minio level instead of hacking the Consumer class.

Redis client name is lost after reconnection

I found that when network issues occur, sometimes Karton replicas appear as inactive even if they're still up and running. The reason is usage of client_setname command that doesn't set client_name property in connection pool.

client_name constructor argument should be used instead if we want to have client name set for specific Redis connection.

In [23]: r = redis.Redis.from_url("redis://redis/")

In [24]: r.client_setname("test-name")
Out[24]: True

In [25]: r.client_getname()
Out[25]: 'test-name'

In [26]: r.connection_pool.disconnect()

In [27]: r.client_getname()

-----------------------

In [28]: r = redis.Redis.from_url("redis://redis", client_name="test-name")

In [29]: r.client_getname()
Out[29]: 'test-name'

In [30]: r.connection_pool.disconnect()

In [31]: r.client_getname()
Out[31]: 'test-name'

Karton Task tree

I use pydot to make graph of task analysis using root_uid. For now i'm storing all events using log consumer, so this way I can use this info to write the graph. Is there any methods using karton-core to get analysis tree by root_uid - or an info about completed task will be deleted by karton-core?

Task timeouts

Karton services should have the option to specify a maximum amount of time the task processing can take.

This should be done using a class field like task_timeout, if the value is set to None or missing the timeout shouldn't be enforced (current behavior).

Tasks that timeout should be marked as such and moved to the error queue, similarly to crashed tasks.

Documentation and User guide

We have a draft in this repository (docs), but it needs to be finished before we announce the official release.

Hard shutdown doesn't work now

Attempt of doing a hard shutdown (2x Ctrl-C) raises an exception:

[2020-10-26 14:04:58,168][INFO] Gracefully shutting down!
[2020-10-26 14:05:06,221][ERROR] Failed to process task - 31231ae2-b6d0-4fb7-883b-8fa42140d4a3
Traceback (most recent call last):
  File "/home/arkadiuszwr/Systemy/karton/karton/core/karton.py", line 182, in internal_process
    self.process()
  File "Basic_model_updater.py", line 113, in process
    apivector_by_dump = self.retrieve_from_cache(sample_hash)
  File "Basic_model_updater.py", line 50, in retrieve_from_cache
    time.sleep(1)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/arkadiuszwr/Systemy/karton/karton/core/karton.py", line 190, in internal_process
    self._run_post_hooks(saved_exception)
UnboundLocalError: local variable 'saved_exception' referenced before assignment

SHA256 pre-evaluation while Resource object is created

For all Resources we want to evaluate the SHA256 sum by default.

  • sha256 should be available both as Resource.metadata["sha256"] and Resource.sha256 (backwards compatibility)
  • sha256 should be accepted by constructor so user can provide pre-evaluated hash as an optimization
  • Maybe it would be nice to provide an option to disable the SHA256 pre-evaluation via constructor flag.

Warn user when Karton logs queue stays unconsumed

The possible common mistake that will be made by users is lack of logs consumer which will result in filled-up RAM by Redis instance.

Possible solutions:

  • emphasize very clearly in documentation that the logger service must be set-up along with karton-system part.
  • warn users by karton-system if the logger part is not active
  • ???

Unittest - LocalResource vs RemoteResource

Hello,
I was trying to make some tests for my plugins in the way described here:
testing-resources

however when we create fake resource in this way:

        # create fake, mini-independent resources
        input_sample = Resource("sample.txt", input_data)

it becomes LocalResource. And then when we create task from it, pass it to our karton with:

        # dry-run the fake task on the wrapped karton system
        results = self.run_task(task)

it is still LocalResource. Being this it does not have "download_temporary_file" method, which is used in plugins (e.g. Strings plugin - example of yours my-first-karton-task ). When I run unittest it throws "AttributeError: 'LocalResource' object has no attribute 'download_temporary_file'" because of that.

  1. Am I doing something wrong or your example isn't accurate?
  2. Can you make some fully covered example with testing-resources and using download_temporary_file method in plugin?
  3. Is it dependent on karton-core version?

Thanks for any answers

Q: Producing multiple tasks

I'm posting this question here because there's no GitHub Discussion tab.

Is it allowed to spawn multiple tasks in sequence from within the same producer? I noticed that all my tasks never complete if I do the following:

...
        tag_task = Task(
            {
                "type": "sample",
                "stage": "analyzed",
                "kind": "runnable",
            },
            payload={
                "sample": sample,
                "parent": sample,
                "tags": tags,
            },
        )
        self.send_task(tag_task)

        blob_task = Task(
            {"type": "blob", "kind": "my-custom-json"},
            payload={
                "parent": sample,
                "content": my_stuff,
            },
        )
        self.send_task(blob_task)
...

So I'm wondering if this is an acceptable pattern or I'm violating some assumptions that I'm not aware of.

Store relationships between routed and unrouted tasks

It's best described with a diagram:
image

Because routed tasks are forked from the unrouted ones they do not contain information about the original unrouted tasks. They both keep the same parent reference and as a result, sometimes we cannot collect the whole task tree.
This gets especially annoying when a consumer spawns 2 new, similar tasks from a single incoming tasks.

The discussed solutions include:

  • storing the unrouted task id inside the routed tasks on forks - adds a bit complexity (yet another task field) but shouldn't break too many existing solutions
  • setting the forked task parent to the unrouted tasks id - elegant and clean but this will probably break a few bones things

Provide customizable CLI interface - KartonBase.main()

Karton services should be invokable via:

  • karton.classifier.Classifier.main()
  • python -m karton.classifier
  • $ karton-classifier

This will be universal for all Karton services, so it would be nice to move the common code to the karton-core library.

Accept IOBase stream as local resource input

In addition to buffer and path, we should accept stream as well. The only issue is size that must be provided by user in that case.

Such a stream will be still owned by caller (Resource.upload doesn't close the stream)

Support for Redis authentication

In some cases Redis may require authentication before accessing the database.
Pass the username and/or password when instantiating Redis object.

RemoteResource downloads sometimes fail

๐Ÿค”

Traceback (most recent call last):
   File "/opt/venvs/drakcore/lib/python3.7/site-packages/drakcore/process.py", line 86, in process
    outputs = plugin.handler(self.current_task, task_resources, self.minio)
   File "/opt/venvs/drakcore/lib/python3.7/site-packages/drakcore/postprocess/log_index.py", line 58, in generate_log_index
    with resource.download_temporary_file() as tmp_file:
   File "/usr/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
   File "/opt/venvs/drakcore/lib/python3.7/site-packages/karton2/resource.py", line 282, in download_temporary_file
    self.download_to_file(tmp.name)
   File "/opt/venvs/drakcore/lib/python3.7/site-packages/karton2/resource.py", line 257, in download_to_file
    self._minio.fget_object(self.bucket, self.uid, path)
   File "/opt/venvs/drakcore/lib/python3.7/site-packages/minio/api.py", line 691, in fget_object
    raise InvalidSizeError(msg)
 minio.error.InvalidSizeError: InvalidSizeError: message: Data written 30748 bytes is smaller than the specified size 31171 bytes

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.