Skip to content

TASKS

cognee uses tasks grouped into pipelines to populate graph and vector stores

Cognee uses tasks grouped into pipelines to populate graph and vector stores. These tasks are designed to analyze and enrich your data, improving the answers generated by Large Language Models (LLMs).

In this section, you'll find a template that you can use to structure your data and build pipelines. These tasks are designed to help you get started with cognee and build reliable LLM pipelines

Task 1: Category Extraction

Data enrichment is the process of enhancing raw data with additional information to make it more valuable. This template is a sample task that extract categories from a document and populates a graph with the extracted categories.

Let's go over the steps to use this template full code provided here:

This function is designed to classify chunks of text using a specified language model. The goal is to categorize the text, map relationships, and store the results in a vector engine and a graph engine. The function is asynchronous, allowing for concurrent execution of tasks like classification and data point creation.

Parameters

  • data_chunks: list[DocumentChunk]: A list of text chunks to be classified. Each chunk represents a piece of text and includes metadata like chunk_id and document_id.
  • classification_model: Type[BaseModel]: The model used to classify each chunk of text. This model is expected to output labels that categorize the text.

Steps in the Function

Check for Empty Input

if len(data_chunks) == 0:
    return data_chunks

If there are no data chunks provided, the function returns immediately with the input list (which is empty).

Classify Each Chunk

chunk_classifications = await asyncio.gather(
    *[extract_categories(chunk.text, classification_model) for chunk in data_chunks],
)

The function uses asyncio.gather to concurrently classify each chunk of text. extract_categories is called for each chunk, and the results are collected in chunk_classifications.

Initialize Data Structures

classification_data_points = []

A list is initialized to store the classification data points that will be used later for mapping relationships and storing in the vector engine.

Generate UUIDs for Classifications

The function loops through each chunk and generates unique identifiers (UUIDs) for both the main classification type and its subclasses:

classification_data_points.append(uuid5(NAMESPACE_OID, chunk_classification.label.type))
classification_data_points.append(uuid5(NAMESPACE_OID, classification_subclass.value))

These UUIDs are used to uniquely identify classifications and ensure consistency.

Retrieve or Create Vector Collection

vector_engine = get_vector_engine()
collection_name = "classification"

The function interacts with a vector engine. It checks if the collection named "classification" exists. If it does, it retrieves existing data points to avoid duplicates. Otherwise, it creates the collection.

Prepare Data Points, Nodes, and Edges

The function then builds a list of data_points (representing the classification results) and constructs nodes and edges to represent relationships between chunks and their classifications:

data_points.append(DataPoint[Keyword](...))
nodes.append((...))
edges.append((...))
  • Nodes: Represent classifications (e.g., media type, subtype).
  • Edges: Represent relationships between chunks and classifications (e.g., "is_media_type", "is_subtype_of").

Create Data Points and Relationships

If there are new nodes or edges to add, the function stores the data points in the vector engine and updates the graph engine with the new nodes and edges:

await vector_engine.create_data_points(collection_name, data_points)
await graph_engine.add_nodes(nodes)
await graph_engine.add_edges(edges)

Return the Processed Chunks

Finally, the function returns the processed data_chunks, which can now be used further as needed:

return data_chunks

Pipeline 1: cognee pipeline

This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores with the results

This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents.

Parameters

  • datasets: Union[str, list[str]] = None: A string or list of dataset names to be processed.
  • user: User = None: The user requesting the processing. If not provided, the default user is retrieved.

Steps in the Function

Database Engine Initialization

db_engine = get_relational_engine()

The function starts by getting an instance of the relational database engine, which is used to retrieve datasets and other necessary data.

Handle Empty or String Dataset Input

if datasets is None or len(datasets) == 0:
    return await cognify(await db_engine.get_datasets())
if type(datasets[0]) == str:
    datasets = await retrieve_datasets(datasets)

If no datasets are provided, the function retrieves all available datasets from the database. If a list of dataset names (strings) is provided, they are converted into dataset objects.

User Authentication

if user is None:
    user = await get_default_user()

If no user is provided, the function retrieves the default user.

Run Cognify Pipeline for Each Dataset

async def run_cognify_pipeline(dataset: Dataset):
    # Pipeline logic goes here...

The run_cognify_pipeline function is defined within cognify and is responsible for processing a single dataset. This is where most of the heavy lifting occurs.

Retrieve Dataset Data

The function fetches all the data associated with the dataset.

data: list[Data] = await get_dataset_data(dataset_id=dataset.id)

Create Document Objects

Based on the file type (e.g., PDF, Audio, Image, Text), corresponding document objects are created.

documents = [...]

Check Permissions

The user's permissions are checked to ensure they can access the documents.

await check_permissions_on_documents(user, "read", document_ids)

Pipeline Status Logging

The function logs the start and end of the pipeline processing.

async with update_status_lock:
    task_status = await get_pipeline_status([dataset_id])
    if dataset_id in task_status and task_status[dataset_id] == "DATASET_PROCESSING_STARTED":
        logger.info("Dataset %s is already being processed.", dataset_name)
        return
    await log_pipeline_status(dataset_id, "DATASET_PROCESSING_STARTED", {...})

Pipeline Tasks

The pipeline consists of several tasks, each responsible for different parts of the processing:

  • document_to_ontology: Maps documents to an ontology structure.
  • source_documents_to_chunks: Splits documents into chunks.
  • chunk_to_graph_decomposition: Defines the graph structure for chunks.
  • chunks_into_graph: Integrates chunks into the knowledge graph.
  • chunk_update_check: Checks for updated or new chunks.
  • save_chunks_to_store: Saves chunks to a vector store and graph database.

Parallel Tasks: chunk_extract_summary and chunk_naive_llm_classifier run in parallel to summarize and classify chunks.

  • chunk_remove_disconnected: Cleans up obsolete chunks.

The tasks are managed and executed asynchronously using the run_tasks and run_tasks_parallel functions.

pipeline = run_tasks(tasks, documents)
async for result in pipeline:
    print(result)

Handle Errors

If any errors occur during processing, they are logged, and the exception is raised.

except Exception as error:
    await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERROR", {...})
    raise error

Processing Multiple Datasets

The function prepares to process multiple datasets concurrently using asyncio.gather.

awaitables = []
for dataset in datasets:
    dataset_name = generate_dataset_name(dataset.name)
    if dataset_name in existing_datasets:
        awaitables.append(run_cognify_pipeline(dataset))
return await asyncio.gather(*awaitables)