TASKS¶
cognee uses tasks grouped into pipelines to populate graph and vector stores
Cognee uses tasks grouped into pipelines to populate graph and vector stores. These tasks are designed to analyze and enrich your data, improving the answers generated by Large Language Models (LLMs).
In this section, you'll find a template that you can use to structure your data and build pipelines. These tasks are designed to help you get started with cognee and build reliable LLM pipelines
Task 1: Category Extraction¶
Data enrichment is the process of enhancing raw data with additional information to make it more valuable. This template is a sample task that extract categories from a document and populates a graph with the extracted categories.
Let's go over the steps to use this template full code provided here:
This function is designed to classify chunks of text using a specified language model. The goal is to categorize the text, map relationships, and store the results in a vector engine and a graph engine. The function is asynchronous, allowing for concurrent execution of tasks like classification and data point creation.
Parameters¶
data_chunks: list[DocumentChunk]
: A list of text chunks to be classified. Each chunk represents a piece of text and includes metadata likechunk_id
anddocument_id
.classification_model: Type[BaseModel]
: The model used to classify each chunk of text. This model is expected to output labels that categorize the text.
Steps in the Function¶
Check for Empty Input¶
If there are no data chunks provided, the function returns immediately with the input list (which is empty).
Classify Each Chunk¶
chunk_classifications = await asyncio.gather(
*[extract_categories(chunk.text, classification_model) for chunk in data_chunks],
)
The function uses asyncio.gather
to concurrently classify each chunk of text. extract_categories
is called for each chunk, and the results are collected in chunk_classifications
.
Initialize Data Structures¶
A list is initialized to store the classification data points that will be used later for mapping relationships and storing in the vector engine.
Generate UUIDs for Classifications¶
The function loops through each chunk and generates unique identifiers (UUIDs) for both the main classification type and its subclasses:
classification_data_points.append(uuid5(NAMESPACE_OID, chunk_classification.label.type))
classification_data_points.append(uuid5(NAMESPACE_OID, classification_subclass.value))
These UUIDs are used to uniquely identify classifications and ensure consistency.
Retrieve or Create Vector Collection¶
The function interacts with a vector engine. It checks if the collection named "classification" exists. If it does, it retrieves existing data points to avoid duplicates. Otherwise, it creates the collection.
Prepare Data Points, Nodes, and Edges¶
The function then builds a list of data_points
(representing the classification results) and constructs nodes and edges to represent relationships between chunks and their classifications:
- Nodes: Represent classifications (e.g., media type, subtype).
- Edges: Represent relationships between chunks and classifications (e.g., "is_media_type", "is_subtype_of").
Create Data Points and Relationships¶
If there are new nodes or edges to add, the function stores the data points in the vector engine and updates the graph engine with the new nodes and edges:
await vector_engine.create_data_points(collection_name, data_points)
await graph_engine.add_nodes(nodes)
await graph_engine.add_edges(edges)
Return the Processed Chunks¶
Finally, the function returns the processed data_chunks
, which can now be used further as needed:
Pipeline 1: cognee pipeline¶
This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores with the results
This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents.
Parameters¶
datasets: Union[str, list[str]] = None
: A string or list of dataset names to be processed.user: User = None
: The user requesting the processing. If not provided, the default user is retrieved.
Steps in the Function¶
Database Engine Initialization¶
The function starts by getting an instance of the relational database engine, which is used to retrieve datasets and other necessary data.
Handle Empty or String Dataset Input¶
if datasets is None or len(datasets) == 0:
return await cognify(await db_engine.get_datasets())
if type(datasets[0]) == str:
datasets = await retrieve_datasets(datasets)
If no datasets are provided, the function retrieves all available datasets from the database. If a list of dataset names (strings) is provided, they are converted into dataset objects.
User Authentication¶
If no user is provided, the function retrieves the default user.
Run Cognify Pipeline for Each Dataset¶
The run_cognify_pipeline
function is defined within cognify
and is responsible for processing a single dataset. This is where most of the heavy lifting occurs.
Retrieve Dataset Data¶
The function fetches all the data associated with the dataset.
Create Document Objects¶
Based on the file type (e.g., PDF, Audio, Image, Text), corresponding document objects are created.
Check Permissions¶
The user's permissions are checked to ensure they can access the documents.
Pipeline Status Logging¶
The function logs the start and end of the pipeline processing.
async with update_status_lock:
task_status = await get_pipeline_status([dataset_id])
if dataset_id in task_status and task_status[dataset_id] == "DATASET_PROCESSING_STARTED":
logger.info("Dataset %s is already being processed.", dataset_name)
return
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_STARTED", {...})
Pipeline Tasks¶
The pipeline consists of several tasks, each responsible for different parts of the processing:
document_to_ontology
: Maps documents to an ontology structure.source_documents_to_chunks
: Splits documents into chunks.chunk_to_graph_decomposition
: Defines the graph structure for chunks.chunks_into_graph
: Integrates chunks into the knowledge graph.chunk_update_check
: Checks for updated or new chunks.save_chunks_to_store
: Saves chunks to a vector store and graph database.
Parallel Tasks: chunk_extract_summary
and chunk_naive_llm_classifier
run in parallel to summarize and classify chunks.
chunk_remove_disconnected
: Cleans up obsolete chunks.
The tasks are managed and executed asynchronously using the run_tasks
and run_tasks_parallel
functions.
Handle Errors¶
If any errors occur during processing, they are logged, and the exception is raised.
except Exception as error:
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERROR", {...})
raise error
Processing Multiple Datasets¶
The function prepares to process multiple datasets concurrently using asyncio.gather
.