Write a simple AI agent
TWITTER_BEARER_TOKEN=...
TWITTER_CONSUMER_KEY=...
TWITTER_CONSUMER_SECRET=...
TWITTER_ACCESS_TOKEN=...
TWITTER_ACCESS_TOKEN_SECRET=...
OPENAI_API_KEY=...empctl secrets upsert --file .envimport asyncio
import json
import os
import sys
from tweepy import Tweet
from tweepy.client import Client
from emp_hooks import twitter
from emp_hooks.hook_manager import hooks
from emp_agents import AgentBase
from emp_agents.providers import OpenAIProvider, OpenAIModelType
my_agent_twitter_id = "1902885085294235650"
@twitter.on_tweet("@empcloud_demo")
def on_tweet(tweet: Tweet):
print("RECEIVED TWEET")
data = json.loads(tweet["data"])
tweet_id = data["id"]
author_id = int(data["author_id"])
# ignore if the post is from the bot itself
if author_id == my_agent_twitter_id:
return
print(f"Received tweet: {tweet_id} from {author_id}")
sys.stdout.flush()
client = Client(
bearer_token=os.environ["TWITTER_BEARER_TOKEN"],
consumer_key=os.environ["TWITTER_CONSUMER_KEY"],
consumer_secret=os.environ["TWITTER_CONSUMER_SECRET"],
access_token=os.environ["TWITTER_ACCESS_TOKEN"],
access_token_secret=os.environ["TWITTER_ACCESS_TOKEN_SECRET"],
)
agent = AgentBase(
prompt="""
You are a helpful agent. You will respond in 200 characters to any questions, in a mechanical way.
You will respond as if you are an extra in a film. Make sure you be kind of depressing and reference
famous poetry in a really pretentious way when you get the chance.
""",
provider=OpenAIProvider(
api_key=os.environ["OPENAI_API_KEY"],
default_model=OpenAIModelType.gpt4o_mini,
)
)
agent_response = agent.answer(data["text"])
response = asyncio.run(agent_response)
try:
response = client.create_tweet(
text=response,
in_reply_to_tweet_id=tweet_id,
)
print(f"Tweeted: {response.data['text']}")
except Exception as e:
print(f"Error Tweeting: {e}")
sys.stdout.flush()
hooks.run(keep_alive=True)Wondering how the tweet arrives at the agent?
Last updated