Giter Site home page Giter Site logo

dgarnitz / vectorflow Goto Github PK

View Code? Open in Web Editor NEW
647.0 10.0 46.0 394 KB

VectorFlow is a high volume vector embedding pipeline that ingests raw data, transforms it into vectors and writes it to a vector DB of your choice.

Home Page: https://www.getvectorflow.com/

License: Apache License 2.0

Python 92.09% Dockerfile 1.81% Shell 6.10%
ai data-engineering embeddings machine-learning nlp vectors

vectorflow's People

Contributors

bm777 avatar danmeier2 avatar david-vectorflow avatar dgarnitz avatar dkimb0 avatar eteimz avatar farhan0167 avatar kpriver55 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vectorflow's Issues

Implement Metrics Tracking / Telemetry

We need a way to collect usage data to understand better how users are utilizing VectorFlow. This should be anonymous so that no user's specific data is ever aggregated.

May want to consider putting these into prometheus

Fix Job Status Logic

Now that Batches are broken up into mini batches for embedding and upload to a vector DB, the logic to tie a Job's status to the total number of batches successfully complete is wrong.

That logic needs to be tried into mini-batches, which either need to be tracked on the batch object OR a new mini-batch table needs to be created

Integrate with Milvus Cloud

VectorFlow needs to support Milvus:

  • Add the upsert logic
  • The upload / upsert should include source_data and an ID generated by vectorflow to prevent duplicate uploads
  • Add a MILVUS_KEY

Investigate adding a sentiment classifier for metadata extraction

Vectorflow should be able to have configurable metadata for upload to vector databases. One common form of metadata is to add sentiment keywords to help with filtering during hybrid searches. During prototyping, this is done by with LLMs but that is too expensive for production.

We need to investigate if something like XG Boost can come close to the performance of an LLM. If so, we should add it to the worker in a follow up issue

Add a reliability mechanism

The hugging face, vdb upload and open ai embeddings workers all need a retry mechanism.

The queue system could be leveraged for this, either a general retry queue at each stage or for each individual worker.

There should be logic to prevent retries when critical system components are down (like open AI's api or a vector DB's host)

Add integration to salesforce

Vectorflow should be able to ingest raw data from Salesforce.

Some open questions to explore prior to implementation:

  • can this we done through the existing API or does it need a separate ingestion worker
  • how to share credentials / do this securely
  • what file formats can be expected

Upgrade qdrant upload to be async, multithreaded

Right now qdrant uses gRPC but not asynchronously and multithreaded with asynchio. This requires a larger write of the worker because asynchio only allows one event loop per program.

Can follow this example here of how to implement it.

Based on earlier test, confirmed that its faster to upload each vector individually following that code example above ^ than it is to batch them.

Add a testing client

Configuring cURL requests, python or TS code to test vectorflow during development .

Add a testing script or client that:

  • spin up needed infra
  • set env vars
  • make it easy to submit a file for embedding and check that vector are already in the vector DB
  • bonus points if it can sanity check some of the chunks

Suggestion: Docs website

For this project it would be good to see a documentation website with the various technical documentation and support outside of just the readme and main getvectorflow.com site.

Something simple using GitHub pages, mkdocs and hosted on docs.getvectorflow.com would be very user friendly with architectures, getting started examples and API docs.

Example:
https://kops.sigs.k8s.io/getting_started/install/
https://squidfunk.github.io/mkdocs-material/blog/2021/12/27/the-past-present-and-future/

Integrate with DeepLake

DeepLake can function as a vector store. We want to add an integration to the worker to be able to write to DeepLake.

DeepLake typically runs in your cloud, although there may be a way to run it locally with Docker, but not via a hosted, managed service. So this issue will also require bootstrapping DeepLake.

Deploy Via Docker + HF = worker error

Hello,

I'm experiencing an issue with the docker deployment using a custom embedding model. I'm trying to use BAAI/bge-large-en-v1.5.

Below is my deployment and outputs.

Configure vars

mkdir env_scripts
cd env_scripts
touch env_scripts/env_vars.env
nano env_scripts/env_vars.env

cat env_scripts/env_vars.env

INTERNAL_API_KEY=qxzZOs0LPqjgkJiv3279rqqxHgzi9k5r
POSTGRES_USERNAME=postgres
POSTGRES_PASSWORD=SuperSecret
POSTGRES_DB=vectorflow
POSTGRES_HOST=postgres
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
EMBEDDING_QUEUE=embeddings
VDB_UPLOAD_QUEUE=vdb-upload
LOCAL_VECTOR_DB=qdrant
# Client Vars - Test
EMBEDDING_TYPE=HUGGING_FACE
VECTOR_DB_TYPE=QDRANT
TESTING_ENV=testing123

Deploy via docker

docker pull rabbitmq
docker pull postgres
docker pull qdrant/qdrant
docker-compose build --no-cache
docker-compose up -d

deploy HF model

cd src
docker build --file hugging_face/Dockerfile -t vectorflow_hf:latest .
docker run --network=vectorflow --name=vectorflow_hf -d --env-file=../env_scripts/env_vars.env vectorflow_hf:latest --model_name "BAAI/bge-large-en-v1.5"

All containers appear normal at this point

Testing

cat src/testing_client-custom.py

import os
import requests
import json
import dotenv
# Load environment variables from .env file
dotenv.load_dotenv(dotenv_path='../env_scripts/env_vars.env')

#####################
# Testing Variables #
#####################
filepath = './api/tests/fixtures/test_medium_text.txt'
url = "http://localhost:8000/embed"
internal_api_key = os.getenv("INTERNAL_API_KEY")
embedding_type = os.getenv("EMBEDDING_TYPE")
vector_db_type = os.getenv("VECTOR_DB_TYPE")
index_name = os.getenv("INDEX_NAME")
testing_environment = os.getenv("TESTING_ENV")

##################
# Testing script #
##################

headers = {
    "Authorization": internal_api_key,
}

data = {
    'EmbeddingsMetadata': json.dumps({
        "embeddings_type": embedding_type,
        "chunk_size": 384,
        "chunk_overlap": 128,
        "chunk_strategy": "PARAGRAPH",
        "hugging_face_model_name": "BAAI/bge-large-en-v1.5"
    }),
    'VectorDBMetadata': json.dumps({
        "vector_db_type": vector_db_type,
        "index_name": index_name,
        "environment": testing_environment
    })
}

files = {
    'SourceData': open(filepath, 'rb')
}

print("sending request")
response = requests.post(
    url,
    headers=headers,
    data=data,
    files=files
)

if response.status_code != 200:
    print(f"Error: {response.text}")
    print(f"Status code: {response.status_code}")
    exit(1)

response_json = response.json()
job_id = response_json['JobID']
print(f"Job ID: {job_id}")

# poll the server for the job status
url = f"http://localhost:8000/jobs/{job_id}/status"
job_status = None
while job_status != "COMPLETED" and job_status != "FAILED":
    headers = {
        "Authorization": internal_api_key,
    }

    response = requests.get(
        url,
        headers=headers
    )

    response_json = response.json()
    job_status = response_json['JobStatus']

print(f"Job status: {job_status}")

Docker logs from "vectorflow_worker" container

wait-for-it.sh: waiting 15 seconds for rabbitmq:5672
wait-for-it.sh: rabbitmq:5672 is available after 7 seconds
--- Logging error ---
Traceback (most recent call last):
  File "/app/worker/worker.py", line 231, in callback
    os.environ["VECTOR_DB_KEY"] = vector_db_key
  File "/usr/local/lib/python3.9/os.py", line 684, in __setitem__
    value = self.encodevalue(value)
  File "/usr/local/lib/python3.9/os.py", line 756, in encode
    raise TypeError("str expected, not %s" % type(value).__name__)
TypeError: str expected, not NoneType
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/logging/__init__.py", line 1083, in emit
    msg = self.format(record)
  File "/usr/local/lib/python3.9/logging/__init__.py", line 927, in format
    return fmt.format(record)
  File "/usr/local/lib/python3.9/logging/__init__.py", line 663, in format
    record.message = record.getMessage()
  File "/usr/local/lib/python3.9/logging/__init__.py", line 367, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
  File "/app/worker/worker.py", line 283, in <module>
    start_connection()
  File "/app/worker/worker.py", line 276, in start_connection
    consume_channel.start_consuming()
  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
    self._process_data_events(time_limit=None)
  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 851, in process_data_events
    self._dispatch_channel_events()
  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
    impl_channel._get_cookie()._dispatch_events()
  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1510, in _dispatch_events
    consumer_info.on_message_callback(self, evt.method,
  File "/app/worker/worker.py", line 238, in callback
    logging.error('Error processing batch:', e)
Message: 'Error processing batch:'
Arguments: (TypeError('str expected, not NoneType'),)

Could you please point me in the right direction for resolving this ?

Much appreciated!

Add Batch Write into Milvus

Swap out the normal upload in Milvus (

def write_embeddings_to_milvus(upsert_list, vector_db_metadata):
connections.connect("default",
uri = vector_db_metadata.environment,
token = os.getenv('VECTOR_DB_KEY')
)
collection = Collection(vector_db_metadata.index_name)
if not collection:
logging.error(f"Index {vector_db_metadata.index_name} does not exist in environment {vector_db_metadata.environment}")
return None
logging.info(f"Starting Milvus insert for {len(upsert_list)} vectors")
batch_size = config.PINECONE_BATCH_SIZE
vectors_uploaded = 0
for i in range(0,len(upsert_list), batch_size):
try:
insert_response = collection.insert(upsert_list[i:i+batch_size])
vectors_uploaded += insert_response.insert_count
except Exception as e:
logging.error('Error writing embeddings to milvus:', e)
return None
logging.info(f"Successfully uploaded {vectors_uploaded} vectors to milvus")
return vectors_uploaded
) for the batch update for faster upload.

This sample code should provide guidance - https://github.com/milvus-io/pymilvus/blob/master/examples/example_bulkinsert_json.py

Add Airbyte Integration

We got approval from the Airbyte team to add a connector to their repo making vectorflow a target destination.

The two systems should be integrated so that data can be passed to VectorFlow via Airbyte

Add Task to Github Actions that Syncs the objects in src/models and client/models

VectorFlow is a mono-repo that contains different representations of the same underlying concept. The classes that appear in both in src/models and client/models have the same fields that need to be kept in sync. Due to the database-specific logic inside src/models the classes cannot be consolidated.

A github actions task needs to check that these classes are in sync and flag when they are out of sync.

Remove duplicate code

Methods such as update_batch_and_job_status are duplicated across the code to prevent different modules from being dependent on each other.

We need to 1) find and document all the repetitive code 2) move it into a shared or utils location 3) make sure the duplication is removed from the existing code

Bug: Locally error's are not returned to user

When hitting the /embed endpoint with incorrect parameters a 500 internal server error is thrown not the error from within the application.

Examples:

  • No pinecone credentials / incorrect pinecone credentials
  • Missing / incorrect EmbeddingsMetadata
  • Missing / incorrect VectorDBMetadata
<!doctype html>
<html lang=en>
<title>500 Internal Server Error</title>
<h1>Internal Server Error</h1>
<p>The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.</p>

Add file name to metadata schema

Its important to know which source document a piece of text came from. The system should support this. Probably the best way to do this is to add an extra field to job that tracks it, then fetch it from the database prior to vector DB upload. This way we can create an SQL report that detects failed jobs and knows which source documents to retry

Implement Log Aggregation & Searching

VectorFlow has many logs spread of over different containers. We need these logs to be aggregated into a searchable form.

One option could be to use Kibana with Elastic Search. If the logs have metrics in them, we may want to put those into prometheus

Swap Flask API for FastAPI

Right now we are running Flask with Gunicorn using PM2. There are a few problems with this:

  • Flask is not fast
  • PM2 is designed for node JS, not pythonb

We should swap Flask for FastAPI and (if possible in one PR) swap PM2 for something more appropriate.

Note: We currently depend on Flask-sql alchemy for our DB ORM. We will need to swap this when migrating away from Flask. This may also require changes to the DB services that are also dependencies for the worker and vdb upload worker

Arguflow Use-Case

Urgent needs of Arguflow

1. Webhook support

We need the embedding vectors, along with their payloads, returned as some consistent type within a payload to a webhook route we can specify.

3. Collision detection

We want to provide the following RPC route and a threshold full_text_score.

  • get_top_full_text_match that returns {id: uuid, full_text_score: float32}

We would like that to get called before using GPU or a usage-priced call to a provider to make the embedding. If the score exceeds a given threshold then we don't need the embedding and just want the index in the webhook request payload that would contain the embedding to have some collision_id: id that marks it as a collision. Our system then handles marking it as such.

It would be cool if we could also provide the followng RPC and a threshold distance.

  • get_top_semantic_match that returns {id: uuid, distance: float32}

If the distance exceeds the threshold we would want to do the same as above. This semantic distance check is not required but would be a nice add. If we don't have it, then we will have to implement it on our end to validate the embeddings returned in the webhook request payload.

4. Document id's

job_id seems like a sort of proxy for a document_id. We don't want to have to track these on our end.

So sending a document then getting back an job_id would require us to associate the uuid of the document to the job_id such that we can link them when the vectors come in to the webhook.

We want to be able to send a document_id on our submission of the document w/ the signed S3 URL and get that document_id back in the request payload on the webhook hit.

5. Email chunker

Emails typically export as HTML or XML. We extract all of the innerText into a txt file before starting to chunk.

For emails, we want one email to constitute a chunk. However, emails are typically chains of several individual emails that have to get parsed.

As the chunk goes, we need to track when emails end. To do so, you want to look for things like regards,,thanks,, etc:

words_trigger_email_end = ['forwarded message', 'has invited you', 'open in', 'google llc', 'original message', 'original message follows', '───' '--', '***', '===', 'regards,', 'from,', 'sincerely,', 'yours,', 'regards,', 'gratitude,', 'appreciation,', 'care,', 'cheers,', 'cordially,', 'gratitude,', 'respectfully,', 'warmly,', 'best,', 'wishes,', 'humbly,', "thanks,"]

We also have to track when emails start because you don't want the noise between emails with things like confidential notices and PII:

words_trigger_email_start = ['to:', 'cc:', 'from:', 'date:', 'sent:', 'subject:', 're:', 'fw:', 'fwd:', 'attachments:', 'attached:', 'wrote:']

Then, we also need to track the keys on the email:

keys_to_track = ['to:', 'cc:', 'from:', 'date:', "sent:"]

Every line of the txt basically has to get split on those keys and then you need to grab the 1 index and add the KV pairs to the metadata that will be included on the embedding type returned on the webhook payload.

There is also an edge-case here where emails can sometimes exceed a given max_chunk size and then need to get split. Handling that is very hard.

I will try to share our code soon, but it has a bunch of hardcoded stuff related to PO's we signed such that it requires significant editing.

6. Sentence/Line ignore triggers during chunking

Assuming OCR parse into txt these ignores are typically line based because OCR does do well with punctuation in our testing. Otherwise these ignores are usually sentence based.

If a word in the array we pass when queuing is in a line or sentence, we want to skip over that line/sentence during chunking:

words_to_trigger_line_ignore = ['to:', 'cc:', 'from:', 'date:', "sent:", "forwarded message", "@", "───", "meeting id:", "password:", "all rights reserved", "has invited you", "google llc",, "recognize the sender's email", "docs.google.com", "external sender", "trust this email", "content is safe", "proof of sender", "do not click", "mentioned in this thread", "google docs sends"]

Future needs

For marketing purposes we are standing up a bunch of "chat with Youtuber foo" demos. That means we are using whisper to Chunk audio files.

We need support doing this kind of chunking. Here is a full implementation we think can mostly be copied.

Research whether `extract_for_token_limit` needs to be updated to support 1106 models.

Hello from linkedin!

I noticed the following function appears to assume the user will use one of ["gpt-4", "gpt-4-32k", "or gpt-3.5-turbo-16k"], and otherwise make some assumptions about model context limits (defaults to gpt-4), and returns a portion of document[] as a function of remaining_tokens. Assuming people are using the 1106 models, is this function still doing what is intended?

    def extract_for_token_limit(self, document, questions):
        encoding = tiktoken.encoding_for_model(self.model)
        question_string = ",".join(questions)
        questions_count = len(encoding.encode(question_string))
        user_prompt_count = len(encoding.encode(self.usecase_enhancement_user_prompt))
        system_prompt_count = len(encoding.encode(self.usecase_enhancement_system_prompt))
        extra_count = len(encoding.encode("'role', 'system', 'content', 'role', 'user', 'content'"))
        token_limit = 8192

        if "16k" in self.model:
            token_limit = 16384
        elif "32k" in self.model:
            token_limit = 32768

        remaining_tokens = token_limit - (questions_count + user_prompt_count + system_prompt_count + extra_count)
        document_encoding = encoding.encode(document)
        if len(encoding.encode(document)) <= remaining_tokens:
            return document
        
        #return encoding.decode(document_encoding[:remaining_tokens])
        return document[:remaining_tokens*3]

additionally, I would suggest using a proper text splitter tool in order to return a more precise slice of the document based on actual token count. There are examples at both extremes where tokens-per-character is not near the assumed values of 3 and 4.

extreme tokens to characters

Add support for different chunking strategies

Right now vectorflow only supports chunking by the an exact length passed in by the user, independent of how the content is structured. Ideally the user should be able to specify whether they want this strategy, or by paragraph, by sentence, etc.

Add top k semantic search endpoint

What To Build

  • An endpoint add to the api that allows the user to pass in a text-only search term, embed it with a supported model (right now only the 3 OpenAI embedding models), query the vector DB
  • This should return the the raw vectors plus the associated metadata. VectorFlow's standard metadata schema includes the source text
  • Make the default K 5 or 10 but enable the user to set this
  • Add a method that does this to the vectorflow client
  • Add a script to the `testing scripts' that servers as an end-to-end test of the whole system
  • No reranker is currently needed. If there is demand, this can be a follow up task

Add Base Docker Image Containing Python Dependencies

Our Docker builds are too slow because they install dependencies every time that rarely change.

Refactor our docker images so that all the dependencies have their own build base image - this will drastically reduce build times.

this new base image will need to have versioning. In a follow up task, we may need to implement some type of semantic versioning and automatic rebuild system so that if dependencies change, the base image will update during a build of the whole vectorflow engine

Multi-file ingestion from S3

Add functionality to stream in a whole directory from S3. This will likely require a separate worker to be spun up to stream in the files so it does not block the API

Support larger file sizes

The default gunicorn timeout is 30 seconds. Even extending that to 5 minutes, the system would still timeout with very large files.

The system needs to be rearchitected so that the creation of batches is done by a separate worker or process than the one handling of HTTP requests

Add RAG search capabilities

Highly recommended to do this issue ticket first

What To Build

  • An endpoint add to the api that allows the user to perform RAG with a given text-only search term. Use the following prompt for the RAG
retrieval_prompt = (
            "Context information is below.\n"
            "---------------------\n"
            "{context_str}\n"
            "---------------------\n"
            "Given the context information and not prior knowledge, "
            "answer the query.\n"
            "Query: {query_str}\n"
            "Answer: "
        )
  • Use the semantic search endpoint created here
  • No reranker is needed yet
  • No query pre-processing is needed yet

Add validator service to API

The api file upload methods have about 15 lines of code that is duplicate across several methods that is validating the request information.

This should be moved into a separate validator service

Add a duplicate prevention mechanism

Right now most vector DBs support uploading duplicate vectors as long as the ID is unique. It might be beneficial for vectorflow to 1) prevent the uploading of duplicate vectors
2) deduplicate an index/collection/class object store

But is it possible to enforce these things in a performant way

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.