%% > Published `=dateformat(this.created, "MMM dd, yyyy")` %% > Published Jan 12, 2025 # Unlocking Agentic AI with Crew AI: A Step-by-Step Guide to Data Analysis As AI systems are evolving, it is clear that we are moving towards Agentic AI workload meaning breaking down the tasks into individual autonomous agents with single responsibility principal and shift from more expensive bigger LLMs into smaller faster LLMs (large language models) or even SLMs (small language models) depending on complexity, analytical depth or reasoning nature of the tasks and each can perform actions by intelligently integrating information from diverse sources. In this post I am bringing a complete, working example of how to build an **Agentic AI** pipeline with **Crew AI** (a popular autonomous Agentic AI framework), pulling data from a local SQLite database, a CSV file, and Twitter(X) API, then performing an analysis and finally summarizing the results using a locally running language model —specifically, the **phi4** model which is considered **SLM** (small language model) installed and hosted via **Ollama**. We’ll walk through everything step by step, from installing dependencies to running the final code. --- # 1. Installing Conda via Homebrew (if needed) If you already have conda, or you have a way to install and manage python virtual environments, you can skip this part. Otherwise the best way to get started with python is through [miniforge](https://github.com/conda-forge/miniforge) and easiest way to start on MacOS for example is through Homebrew: ```bash # Install Homebrew if not installed /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" # Install Miniforge (recommended for conda-forge) brew install --cask miniforge # Initialize conda (zsh example) conda init zsh source ~/.zshrc ``` --- # 2. Creating and Activating a New Conda Environment Create a fresh environment to keep dependencies organized: ```bash conda create -n agenticAI python=3.10 conda activate agenticAI ``` --- # 3. Installing Dependencies Inside the `agenticAI` environment: ```bash conda install pandas numpy sqlalchemy requests jupyter tweepy -c conda-forge ``` Optionally add visualization libraries: ```bash conda install matplotlib seaborn -c conda-forge ``` Then install **Crew AI** from PyPI (assuming it’s published there): ```bash pip install crewai ``` --- # 4. Installing and Setting Up Ollama [Ollama](https://github.com/jmorganca/ollama) lets you run large language models locally on macOS. Install it via Homebrew: ```bash brew install ollama ``` Now, **pull** the **phi4** model (a smaller, faster local model well-suited for shorter summaries): ```bash ollama pull phi4 ``` We need to first open a separate terminal to run the ollama server using: ```bash ollama serve ``` finally for running the phi4 model we can run it and keep it alive while testing for 24 hours (you can always type /bye to get out of interactive chat): ```bash ollama run phi4 --keepalive=24h ``` --- # 5. Preparing the Data Sources We’ll create **three** sources: 1. **SQLite Database** containing employee data. 2. **CSV File** containing department goals. 3. **Twitter (X)** data about a hashtag (e.g., #YourCompany). ### 5.1. Creating a Sample SQLite Database 1. Create a file named `init_db.sql` with the following contents: ```sql CREATE TABLE IF NOT EXISTS employee_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, department TEXT, performance_score INTEGER ); INSERT INTO employee_data (name, department, performance_score) VALUES ('Alice', 'Engineering', 85), ('Bob', 'Engineering', 90), ('Charlie', 'HR', 75), ('Diana', 'Marketing', 88), ('Evelyn', 'Engineering', 92); ``` 2. Run the SQL script to create `employee_data` table inside `company_data.db`: ```bash sqlite3 company_data.db < init_db.sql ``` ### 5.2. Creating a Sample CSV File Create a file named `department_goals.csv`: ```csv department,monthly_sales_goal,quarterly_hiring_goal Engineering,50000,3 HR,0,2 Marketing,30000,1 ``` This CSV shows a hypothetical monthly sales goal and quarterly hiring goal for each department. ### 5.3. Twitter (X) API Setup 1. Create a Project and application on [developer.twitter.com](https://developer.twitter.com). 2. We are using V2 API. Get your **Bearer Token** from [developer.twitter.com](https://developer.twitter.com). 3. Export it as environment variables (adjust to your shell): ```bash export TWITTER_BEARER_TOKEN="YOUR_BEARER_TOKEN" ``` 3. We’ll use **tweepy** to fetch tweets about `#YourCompany` or any relevant hashtag. --- # 6. Full Python Code (Crew AI Agents + Ollama Integration) Create a file named `agentic_pipeline.py` (or whatever you like) with the following code. You can run it directly (e.g., `python agentic_pipeline.py`) **after** you’ve completed the setup above. ```python import os import sqlite3 import subprocess import pandas as pd import tweepy #################################### # 1. Define a minimal "Agent" base #################################### class SimpleAgent: def __init__(self, role, goal, backstory): self.role = role self.goal = goal self.backstory = backstory def run(self, context): raise NotImplementedError("Subclasses must implement run(context).") #################################### # 2. Define each specialized Agent #################################### class DatabaseAgent(SimpleAgent): def __init__(self): super().__init__( role="system", goal="Fetch employee data from local SQLite database", backstory="We have a table of employees with performance scores." ) def run(self, context): conn = sqlite3.connect("company_data.db") df_employees = pd.read_sql_query("SELECT * FROM employee_data;", conn) conn.close() context["db_data"] = df_employees class CSVAgent(SimpleAgent): def __init__(self): super().__init__( role="system", goal="Load department goals from a CSV file", backstory="We store monthly sales/hiring goals for each department." ) def run(self, context): df_goals = pd.read_csv("department_goals.csv") context["csv_data"] = df_goals class TwitterAgent(SimpleAgent): def __init__(self): super().__init__( role="system", goal="Fetch recent tweets for #YourCompany (v2)", backstory="We use the new Twitter API v2 via a Bearer Token." ) def run(self, context): bearer_token = os.environ.get("TWITTER_BEARER_TOKEN") if not bearer_token: print("No v2 Bearer token found. Using dummy tweets for demo.") sample_tweets = [ "Excited about #YourCompany's new product launch!", "Check out #YourCompany's latest feature!" ] context["twitter_data"] = sample_tweets return client = tweepy.Client(bearer_token=bearer_token) try: response = client.search_recent_tweets( query="#YourCompany", max_results=10, tweet_fields=["created_at", "author_id", "text", "public_metrics"] ) tweets = [tweet.text for tweet in response.data] if response.data else [] context["twitter_data"] = tweets except tweepy.TweepyException as e: print(f"Error fetching tweets with v2: {e}") context["twitter_data"] = [] class AnalysisAgent(SimpleAgent): def __init__(self): super().__init__( role="system", goal="Combine data from DB, CSV, and Twitter, then compute a short summary", backstory="We want to see how employees perform, compare with department goals, and track social sentiment." ) def run(self, context): db_data = context.get("db_data") csv_data = context.get("csv_data") twitter_data = context.get("twitter_data", []) if db_data is None or csv_data is None: context["analysis_summary"] = "Insufficient data to perform analysis." return # Basic stats from the employee DB avg_perf = db_data["performance_score"].mean() top_perf = db_data.loc[db_data["performance_score"].idxmax(), "name"] eng_count = len(db_data[db_data["department"] == "Engineering"]) # Merge department goals with aggregated employee data dept_group = ( db_data .groupby("department") .agg({"performance_score": "mean"}) .reset_index() .rename(columns={"performance_score": "avg_performance_score"}) ) merged_df = pd.merge(dept_group, csv_data, on="department", how="left") # Twitter stats tweet_count = len(twitter_data) analysis_summary = f""" --- EMPLOYEE DATABASE --- Average Performance Score: {avg_perf:.2f} Top Performer: {top_perf} Number of Engineers: {eng_count} --- DEPARTMENT GOALS (Merged Data) --- {merged_df.to_string(index=False)} --- TWITTER DATA --- Number of Tweets Fetched: {tweet_count} Sample Tweets: {twitter_data[:2] if tweet_count > 0 else "No tweets found"} """ context["analysis_summary"] = analysis_summary class LLMAgent(SimpleAgent): def __init__(self): super().__init__( role="system", goal="Generate a concise summary from the analysis using phi4 (Ollama)", backstory="We want a natural language overview of the combined data." ) def run(self, context): analysis_summary = context.get("analysis_summary", "") if not analysis_summary.strip(): context["llm_response"] = "No analysis data available to summarize." return prompt = f"""Summarize the following data and provide insights or recommendations. Data: {analysis_summary} Please focus on: - Observations about performance scores - Department goals - Any interesting patterns from Twitter data Provide a concise, human-readable response. """ try: # Pipe the prompt in via stdin instead of using flags result = subprocess.run( ["ollama", "run", "phi4"], input=prompt, # pass the prompt as stdin text=True, # treat stdin/stdout as text capture_output=True, # capture stdout/stderr check=True # raise error on non-zero exit ) llm_response = result.stdout except subprocess.CalledProcessError as e: llm_response = f"Error running local LLM: {e}" context["llm_response"] = llm_response #################################### # 3. Our own "Crew" manager #################################### class SimpleCrew: """ A simple orchestrator that runs each agent in order. This replaces the missing crew.run(). """ def __init__(self, agents): self.agents = agents def run_all(self): """ Build up a shared context dictionary by calling each agent in sequence. """ context = {} for agent in self.agents: agent.run(context) return context def main(): # Instead of using crewai.Crew, we use our SimpleCrew class crew = SimpleCrew(agents=[ DatabaseAgent(), CSVAgent(), TwitterAgent(), AnalysisAgent(), LLMAgent() ]) final_context = crew.run_all() final_summary = final_context.get("llm_response", "") print("\n\n========== FINAL LLM SUMMARY ==========") print(final_summary) if __name__ == "__main__": main() ``` ### Running the Script 1. Make sure you’ve **activated** your `agenticAI` environment: ```bash conda activate agenticAI ``` 2. Ensure that `company_data.db` (with the `employee_data` table) and `department_goals.csv` are in the same folder as `agentic_pipeline.py`. 3. If you have real Twitter API Token, export it in your terminal: ```bash export TWITTER_BEARER_TOKEN="YOUR_BEARER_TOKEN" ``` 4. Finally, run the pipeline: ```bash python agentic_pipeline.py ``` You should see console outputs from each agent, and finally a **concise summary** from your local **phi4** model. --- # 7. Conclusion We demonstrated we can create single purpose agents, each of which is focused on single task and for this demonstration we showed it can be done using locally run small language model like phi-4. and our final output will be something like ```md ========== FINAL LLM SUMMARY ========== ### Summary and Insights: #### Performance Scores: - **Overall Average**: The average employee performance score across all departments is 86.00, indicating a high level of performance among employees. - **Engineering Department**: This department stands out with an average performance score of 89.0, the highest among the three departments listed. It aligns well with its reputation for having top performers like Evelyn. - **HR and Marketing Comparison**: The HR department has a lower average performance score (75.0), suggesting potential areas for improvement or support. In contrast, Marketing is performing robustly with an average score of 88.0. #### Department Goals: - **Engineering**: With high performance scores and substantial monthly sales goals ($50,000), the Engineering department appears well-positioned to meet its objectives. Its quarterly hiring goal is also in line with the current number of engineers (3). - **HR**: Despite having no monthly sales goals, HR's focus might be on internal processes or employee welfare, which could explain the lower performance score compared to other departments. - **Marketing**: With a strong performance score and moderate sales goals ($30,000), Marketing seems effective. The department's hiring goal is minimal (1 per quarter), suggesting it may currently have sufficient staffing levels. #### Twitter Data: - Only two tweets were fetched about YourCompany, but both are positive and centered on product launches and new features. - **Pattern**: The language in the tweets ("Excited" and "Check out") suggests effective marketing efforts in engaging customers. This aligns with the high performance score of the Marketing department. ### Recommendations: 1. **Support for HR Department**: - Consider initiatives to enhance performance, such as training programs or workshops that could help improve their average score. 2. **Leverage Engineering Strength**: - Utilize the top performers in Engineering for mentorship roles across departments, potentially raising overall company performance scores. 3. **Marketing Continuity**: - Continue current marketing strategies given their apparent success in generating positive public interest and engagement. 4. **Monitor Twitter Engagement**: - Expand social media monitoring to gather more data on customer sentiment and further tailor product announcements for maximum impact. These insights provide a balanced view of the company's strengths and areas needing attention, leveraging departmental strengths while addressing any weaknesses proactively. ``` This approach demonstrates the power of **Agentic AI**: specialized agents can gather and transform data from various sources, then hand off to a local Language Model for natural language insights, all while maintaining data privacy and control by running a local model phi-4 which is super fast and inexpensive to run in any server. Feel free to experiment further by adding additional agents, external APIs, or more sophisticated analysis steps!