Twitter Stream in Neo4J : See Twitts differently

The idea here is to show a concrete exemple for implementing a Neo4J server (embeeded) that will represent a Twitter Stream Activity.

Before coding, we have to think about how to represent nodes and relationships from the twitter world. In my tool, I’m use to represent twitter entities like this :

A node is a Twitter entity (Twitt, User, Hashtag, URL, Media) and it has some links between this entities

This is what I call a Network Logic and it works more or less like that :

  • user-[twitt]->twitt
  • twitt-[mention]->user
  • twitt-[hashtag]->hashtag
  • twitt-[URL]->URL
  • twitt-[Media]->Media
  • user-[RT]->twitt
  • user-[RT]->user

Now the idea is to Stream a topic (words and / or users) through the Twitter Streaming API ( I use Twitter4J ) and use Neo4J to store & query the result (in real-time-like)

Implementation

I create a very simple class to handle the network logic and the initialization of the Neo4J embeeded server.

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseBuilder;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.index.Index;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.server.WrappingNeoServerBootstrapper;

import twitter4j.HashtagEntity;
import twitter4j.MediaEntity;
import twitter4j.Status;
import twitter4j.URLEntity;
import twitter4j.UserMentionEntity;
public class Twitterj4neo {

	private static enum RelType implements RelationshipType
	{
		MENTION,
		TWITT,
		HASHTAG,
		URL,
		MEDIA,
		RT
	}

	private static String DB_PATH = "C:/neo4j-community-1.8.1-windows/TwittGraphTEST/";
	private Index _entitiesIndex ;

	private Index _UserIndex ;
	private Index _TwittIndex ;
	private Index _URLIndex ;
	private Index _HashtagIndex ;
	private Index _MediaIndex ;
	private Index _relMENTION;
	private Index_relTWITT;
	private Index_relHASHTAG;
	private Index_relURL;
	private Index_relMEDIA;
	private Index _relRT;

	GraphDatabaseService _graphDB;

	static int _relcounter = 0;
	public Twitterj4neo()
	{
		createDB();
	}
	private void createDB()
	{
		_graphDB = new GraphDatabaseFactory().newEmbeddedDatabase(DB_PATH);
		_entitiesIndex= _graphDB.index().forNodes( "twitterEntities" );
		 _UserIndex= _graphDB.index().forNodes( "User" ); 
		_TwittIndex= _graphDB.index().forNodes( "Twitt" ); 
		 _URLIndex = _graphDB.index().forNodes( "URL" );
		_HashtagIndex= _graphDB.index().forNodes( "Hashtag" ); 
		_MediaIndex= _graphDB.index().forNodes( "Media" ); 

		_relMENTION = _graphDB.index().forRelationships("Mention");
		_relTWITT = _graphDB.index().forRelationships("Twitt");
		_relHASHTAG = _graphDB.index().forRelationships("Hashtag");
		_relURL = _graphDB.index().forRelationships("URL");
		_relMEDIA = _graphDB.index().forRelationships("Media");
		_relRT = _graphDB.index().forRelationships("RT");

		WrappingNeoServerBootstrapper srv;
		srv = new WrappingNeoServerBootstrapper((GraphDatabaseAPI)  _graphDB );
		srv.start();

	}
	private Node createNodeType(Object id,String type)
	{
		Node statusNode = createNode(id);
		statusNode.setProperty("type", type);
		if(type.equals("User"))
		{
			_UserIndex.add(statusNode, "twitt_id", id);
		}
		if(type.equals("Hashtag"))
		{
			_HashtagIndex.add(statusNode, "twitt_id", id);
		}
		if(type.equals("URL"))
		{
			_URLIndex.add(statusNode, "twitt_id", id);
		}
		if(type.equals("Media"))
		{
			_MediaIndex.add(statusNode, "twitt_id", id);
		}
		if(type.equals("Twitt"))
		{
			_TwittIndex.add(statusNode, "twitt_id", id);
		}
		return statusNode;
	}
	private Node createNode(Object id)
	{
		Node statusNode;
		statusNode = _entitiesIndex.get( "twitt_id", id).getSingle();
		if(statusNode!=null)
		{
			return statusNode;
		}

		statusNode = _graphDB.createNode();
		statusNode.setProperty("twitt_id",id);
		_entitiesIndex.add( statusNode, "twitt_id", id );
		return statusNode;
	}
	public void newStatus(Status s)
	{
		Transaction tx = _graphDB.beginTx();
		try
		{
			Node statusNode = createNodeType(s.getId(),"Twitt");
			statusNode.setProperty("text", s.getText());
			if(s.getGeoLocation()!=null)
			{
				statusNode.setProperty("Geo_Lat", s.getGeoLocation().getLatitude());
				statusNode.setProperty("Geo_Long", s.getGeoLocation().getLongitude());
			}

			Node user = createNodeType("@"+s.getUser().getScreenName().toLowerCase(),"User");
			user.setProperty("name", s.getUser().getName());

			_relTWITT.add(user.createRelationshipTo(statusNode, RelType.TWITT),"rel_id",_relcounter++);

			if(s.isRetweet())
			{
				Status rts =s.getRetweetedStatus();
				Node retweetedUser = createNodeType("@"+rts.getUser().getScreenName(),"User");
				retweetedUser.setProperty("name", rts.getUser().getName());

				Node retweetedTwitt = createNodeType(rts.getId(),"Twitt");
				retweetedTwitt.setProperty("text", rts.getText());
				if(rts.getGeoLocation()!=null)
				{
					retweetedTwitt.setProperty("Geo_Lat", rts.getGeoLocation().getLatitude());
					retweetedTwitt.setProperty("Geo_Long", rts.getGeoLocation().getLongitude());
				}
				_relTWITT.add(retweetedUser.createRelationshipTo(retweetedTwitt, RelType.TWITT),"rel_id",_relcounter++);
				_relRT.add(user.createRelationshipTo(retweetedUser, RelType.RT),"rel_id",_relcounter++);
				_relRT.add(user.createRelationshipTo(retweetedTwitt, RelType.RT),"rel_id",_relcounter++);

			}

			for(UserMentionEntity me:s.getUserMentionEntities())
			{
				if(!s.isRetweet() || (!me.getScreenName().equals(s.getRetweetedStatus().getUser().getScreenName())) )
				{
					Node mention = createNodeType("@"+me.getScreenName().toLowerCase(),"User");

					_relMENTION.add(statusNode.createRelationshipTo(mention, RelType.MENTION),"rel_id",_relcounter++);
				}
			}

			for(HashtagEntity hs:s.getHashtagEntities())
			{

				Node mention = createNodeType("#"+hs.getText().toLowerCase(),"Hashtag");
				_relHASHTAG.add(statusNode.createRelationshipTo(mention, RelType.HASHTAG),"rel_id",_relcounter++);

			}

			for(URLEntity url:s.getURLEntities())
			{
				Node mention = createNodeType(url.getExpandedURL(),"URL");
				_relURL.add(statusNode.createRelationshipTo(mention, RelType.URL),"rel_id",_relcounter++);
			}

			for(MediaEntity media:s.getMediaEntities())
			{
				Node mention = createNodeType(media.getExpandedURL(),"Media");

				_relMEDIA.add(statusNode.createRelationshipTo(mention, RelType.MEDIA),"rel_id",_relcounter++);
			}
		    tx.success();
		}
		catch(Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
		    tx.finish();
		}

	}
	public void closeDB()
	{
		_graphDB.shutdown();
	}

}

I need to use index to retrive existing nodes. So, I have a master index twitterEntities and several others that might be useful one day (I’m not advenced in Neo4J to know what to do with that).

Now it’s complete, I plug it into my TwitterStreamer when it receives a Status.

Twitterj4neo neo = new Twitterj4neo();

@Override
public void onStatus(Status arg0) {

  neo.newStatus(arg0);
}

 

And that’s it ! Just have to run it and wait for t(w)itts !

I create a special style to have a nice visualuzation of my entities. That’s very easy and helps a lot .

To compare, here is the gephi representation (same Network logic, here we see a part of the complete graph)

Comments

The webconsole ist zuper toll (as we say here) !  It’s a kind of real-time and you can be more precise on your research. I don’t master yet the query language, but it seems to very powerfull for complex query.

On the other hand, it’s not very usable for « big graph », let say that it’s enough for 100 nodes max or the graph goes on strike. I use a lot Gephi, which is more graphical and robust for real time large graph. I would say the 2 tools complete each other with :

  • Gephi for global and live approach
  • Neo4J for focus and afterward analysis

(there is a plugin in gephi ! but it’s buging on my side ….).

I’m also eager to try Linkurious, maybe this tools will bring more usabilly for playing with graph than the standard webconsole.

Laisser un commentaire